Wire admin rules into ONNX anonymizer
This commit is contained in:
406
admin_rules.py
Normal file
406
admin_rules.py
Normal file
@@ -0,0 +1,406 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Helpers partagés pour les règles d'administration.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from copy import deepcopy
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
import re
|
||||||
|
|
||||||
|
try:
|
||||||
|
import yaml
|
||||||
|
except Exception:
|
||||||
|
yaml = None
|
||||||
|
|
||||||
|
from config_defaults import CONFIG_DIR, deep_merge_dict
|
||||||
|
|
||||||
|
|
||||||
|
DEFAULT_ADMIN_RULES_CONFIG_PATH = CONFIG_DIR / "admin_rules.default.yml"
|
||||||
|
RUNTIME_ADMIN_RULES_CONFIG_PATH = CONFIG_DIR / "admin_rules.yml"
|
||||||
|
|
||||||
|
_RUNTIME_ADMIN_RULES_OVERLAY_TEXT = """# Surcharge locale des règles d'administration.
|
||||||
|
# Ce fichier est optionnel. Les règles actives de config/admin_rules.default.yml
|
||||||
|
# restent valides tant qu'aucune surcharge locale n'est définie ici.
|
||||||
|
#
|
||||||
|
# Exemple :
|
||||||
|
# version: 1
|
||||||
|
# rules:
|
||||||
|
# - id: rule_identifier_1234567
|
||||||
|
# status: active
|
||||||
|
# governance:
|
||||||
|
# approved_by: responsable_qualite
|
||||||
|
version: 1
|
||||||
|
rules: []
|
||||||
|
"""
|
||||||
|
|
||||||
|
_FALLBACK_DEFAULT_ADMIN_RULES_DICT: dict[str, Any] = {
|
||||||
|
"version": 1,
|
||||||
|
"rules": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _is_non_empty_string(value: Any) -> bool:
|
||||||
|
return isinstance(value, str) and bool(value.strip())
|
||||||
|
|
||||||
|
|
||||||
|
def read_default_admin_rules_text() -> str:
|
||||||
|
try:
|
||||||
|
return DEFAULT_ADMIN_RULES_CONFIG_PATH.read_text(encoding="utf-8")
|
||||||
|
except Exception:
|
||||||
|
return "version: 1\nrules: []\n"
|
||||||
|
|
||||||
|
|
||||||
|
def read_runtime_admin_rules_overlay_text() -> str:
|
||||||
|
return _RUNTIME_ADMIN_RULES_OVERLAY_TEXT
|
||||||
|
|
||||||
|
|
||||||
|
def load_default_admin_rules_dict() -> dict[str, Any]:
|
||||||
|
if yaml is None:
|
||||||
|
return deepcopy(_FALLBACK_DEFAULT_ADMIN_RULES_DICT)
|
||||||
|
try:
|
||||||
|
loaded = yaml.safe_load(read_default_admin_rules_text()) or {}
|
||||||
|
if isinstance(loaded, dict):
|
||||||
|
return loaded
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return deepcopy(_FALLBACK_DEFAULT_ADMIN_RULES_DICT)
|
||||||
|
|
||||||
|
|
||||||
|
def load_runtime_admin_rules_overlay_dict(path: Path | None = None) -> dict[str, Any]:
|
||||||
|
target = Path(path) if path is not None else RUNTIME_ADMIN_RULES_CONFIG_PATH
|
||||||
|
if not target.exists() or yaml is None:
|
||||||
|
return {}
|
||||||
|
try:
|
||||||
|
loaded = yaml.safe_load(target.read_text(encoding="utf-8")) or {}
|
||||||
|
if isinstance(loaded, dict):
|
||||||
|
return loaded
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def _merge_rules_by_id(base_rules: list[dict[str, Any]], overlay_rules: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||||
|
merged: list[dict[str, Any]] = [deepcopy(rule) for rule in base_rules]
|
||||||
|
index_by_id = {
|
||||||
|
rule.get("id"): idx
|
||||||
|
for idx, rule in enumerate(merged)
|
||||||
|
if isinstance(rule, dict) and _is_non_empty_string(rule.get("id"))
|
||||||
|
}
|
||||||
|
for overlay_rule in overlay_rules:
|
||||||
|
if not isinstance(overlay_rule, dict):
|
||||||
|
continue
|
||||||
|
rule_id = overlay_rule.get("id")
|
||||||
|
if _is_non_empty_string(rule_id) and rule_id in index_by_id:
|
||||||
|
idx = index_by_id[rule_id]
|
||||||
|
merged[idx] = deep_merge_dict(merged[idx], overlay_rule)
|
||||||
|
else:
|
||||||
|
merged.append(deepcopy(overlay_rule))
|
||||||
|
if _is_non_empty_string(rule_id):
|
||||||
|
index_by_id[rule_id] = len(merged) - 1
|
||||||
|
return merged
|
||||||
|
|
||||||
|
|
||||||
|
def merge_admin_rules_dict(base: dict[str, Any], overlay: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
merged = deep_merge_dict(base, {k: v for k, v in overlay.items() if k != "rules"})
|
||||||
|
merged["rules"] = _merge_rules_by_id(base.get("rules", []) or [], overlay.get("rules", []) or [])
|
||||||
|
return merged
|
||||||
|
|
||||||
|
|
||||||
|
def load_effective_admin_rules_dict(path: Path | None = None) -> dict[str, Any]:
|
||||||
|
return merge_admin_rules_dict(
|
||||||
|
load_default_admin_rules_dict(),
|
||||||
|
load_runtime_admin_rules_overlay_dict(path),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_runtime_admin_rules_config(path: Path | None = None) -> Path:
|
||||||
|
target = Path(path) if path is not None else RUNTIME_ADMIN_RULES_CONFIG_PATH
|
||||||
|
if not target.exists():
|
||||||
|
target.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
target.write_text(read_runtime_admin_rules_overlay_text(), encoding="utf-8")
|
||||||
|
return target
|
||||||
|
|
||||||
|
|
||||||
|
def _dedupe_keep_order(values: list[str]) -> list[str]:
|
||||||
|
seen: set[str] = set()
|
||||||
|
output: list[str] = []
|
||||||
|
for value in values:
|
||||||
|
if value in seen:
|
||||||
|
continue
|
||||||
|
seen.add(value)
|
||||||
|
output.append(value)
|
||||||
|
return output
|
||||||
|
|
||||||
|
|
||||||
|
def generate_rule_variants(rule: dict[str, Any], limit: int = 12) -> list[str]:
|
||||||
|
rule_type = rule.get("type")
|
||||||
|
match = rule.get("match") or {}
|
||||||
|
normalization = rule.get("normalization") or {}
|
||||||
|
variants: list[str] = []
|
||||||
|
|
||||||
|
if rule_type in {"exact_term", "preserve_phrase"}:
|
||||||
|
exact_value = str(match.get("exact_value", "")).strip()
|
||||||
|
return [exact_value] if exact_value else []
|
||||||
|
|
||||||
|
if rule_type == "normalized_identifier":
|
||||||
|
canonical = str(match.get("canonical_value", "")).strip()
|
||||||
|
prefixes = normalization.get("accepted_prefixes") or []
|
||||||
|
separators = normalization.get("prefix_value_separators") or [" "]
|
||||||
|
if normalization.get("allow_bare_value", False) and canonical:
|
||||||
|
variants.append(canonical)
|
||||||
|
for prefix in prefixes:
|
||||||
|
for separator in separators:
|
||||||
|
variants.append(f"{prefix}{separator}{canonical}")
|
||||||
|
if normalization.get("multiline", False):
|
||||||
|
variants.append(f"{prefix}\n{canonical}")
|
||||||
|
return _dedupe_keep_order(variants)[:limit]
|
||||||
|
|
||||||
|
if rule_type == "contextual_identifier":
|
||||||
|
canonical = str(match.get("canonical_value", "")).strip()
|
||||||
|
prefixes = match.get("context_prefixes") or []
|
||||||
|
separators = match.get("context_separators") or [": ", ":"]
|
||||||
|
for prefix in prefixes:
|
||||||
|
for separator in separators:
|
||||||
|
variants.append(f"{prefix}{separator}{canonical}")
|
||||||
|
if (rule.get("normalization") or {}).get("multiline", False):
|
||||||
|
variants.append(f"{prefix}\n{canonical}")
|
||||||
|
variants.append(f"{prefix} :\n{canonical}")
|
||||||
|
return _dedupe_keep_order(variants)[:limit]
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
VALID_TYPES = {
|
||||||
|
"exact_term",
|
||||||
|
"normalized_identifier",
|
||||||
|
"contextual_identifier",
|
||||||
|
"preserve_phrase",
|
||||||
|
}
|
||||||
|
VALID_ACTIONS = {"mask", "preserve"}
|
||||||
|
VALID_STATUSES = {"draft", "candidate", "approved", "active", "disabled", "retired"}
|
||||||
|
VALID_ENVIRONMENTS = {"test", "staging", "prod"}
|
||||||
|
VALID_SECTIONS = {"narrative", "structured", "table", "header", "footer"}
|
||||||
|
|
||||||
|
|
||||||
|
def validate_rules_config(data: dict[str, Any]) -> list[str]:
|
||||||
|
errors: list[str] = []
|
||||||
|
|
||||||
|
version = data.get("version")
|
||||||
|
if not isinstance(version, int) or version < 1:
|
||||||
|
errors.append("`version` doit etre un entier >= 1.")
|
||||||
|
|
||||||
|
rules = data.get("rules")
|
||||||
|
if not isinstance(rules, list):
|
||||||
|
errors.append("`rules` doit etre une liste.")
|
||||||
|
return errors
|
||||||
|
|
||||||
|
seen_ids: set[str] = set()
|
||||||
|
for index, rule in enumerate(rules):
|
||||||
|
prefix = f"rules[{index}]"
|
||||||
|
if not isinstance(rule, dict):
|
||||||
|
errors.append(f"{prefix}: chaque regle doit etre un mapping.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
rule_id = rule.get("id")
|
||||||
|
if not _is_non_empty_string(rule_id):
|
||||||
|
errors.append(f"{prefix}: `id` est obligatoire.")
|
||||||
|
elif rule_id in seen_ids:
|
||||||
|
errors.append(f"{prefix}: `id` duplique `{rule_id}`.")
|
||||||
|
else:
|
||||||
|
seen_ids.add(rule_id)
|
||||||
|
|
||||||
|
if not _is_non_empty_string(rule.get("label")):
|
||||||
|
errors.append(f"{prefix}: `label` est obligatoire.")
|
||||||
|
|
||||||
|
rule_type = rule.get("type")
|
||||||
|
if rule_type not in VALID_TYPES:
|
||||||
|
errors.append(f"{prefix}: `type` invalide.")
|
||||||
|
|
||||||
|
action = rule.get("action")
|
||||||
|
if action not in VALID_ACTIONS:
|
||||||
|
errors.append(f"{prefix}: `action` invalide.")
|
||||||
|
|
||||||
|
status = rule.get("status")
|
||||||
|
if status not in VALID_STATUSES:
|
||||||
|
errors.append(f"{prefix}: `status` invalide.")
|
||||||
|
|
||||||
|
if action == "mask" and not _is_non_empty_string(rule.get("placeholder")):
|
||||||
|
errors.append(f"{prefix}: `placeholder` est obligatoire pour une regle de masquage.")
|
||||||
|
|
||||||
|
match = rule.get("match")
|
||||||
|
if not isinstance(match, dict):
|
||||||
|
errors.append(f"{prefix}: `match` doit etre un mapping.")
|
||||||
|
match = {}
|
||||||
|
|
||||||
|
normalization = rule.get("normalization") or {}
|
||||||
|
if normalization and not isinstance(normalization, dict):
|
||||||
|
errors.append(f"{prefix}: `normalization` doit etre un mapping.")
|
||||||
|
normalization = {}
|
||||||
|
|
||||||
|
scope = rule.get("scope")
|
||||||
|
if not isinstance(scope, dict):
|
||||||
|
errors.append(f"{prefix}: `scope` doit etre un mapping.")
|
||||||
|
scope = {}
|
||||||
|
|
||||||
|
governance = rule.get("governance")
|
||||||
|
if not isinstance(governance, dict):
|
||||||
|
errors.append(f"{prefix}: `governance` doit etre un mapping.")
|
||||||
|
governance = {}
|
||||||
|
|
||||||
|
document_families = scope.get("document_families")
|
||||||
|
if not isinstance(document_families, list) or not document_families:
|
||||||
|
errors.append(f"{prefix}: `scope.document_families` doit etre une liste non vide.")
|
||||||
|
|
||||||
|
environments = scope.get("environments")
|
||||||
|
if not isinstance(environments, list) or not environments:
|
||||||
|
errors.append(f"{prefix}: `scope.environments` doit etre une liste non vide.")
|
||||||
|
else:
|
||||||
|
invalid_envs = [value for value in environments if value not in VALID_ENVIRONMENTS]
|
||||||
|
if invalid_envs:
|
||||||
|
errors.append(f"{prefix}: environnements invalides: {', '.join(invalid_envs)}.")
|
||||||
|
|
||||||
|
sections = scope.get("sections")
|
||||||
|
if not isinstance(sections, list) or not sections:
|
||||||
|
errors.append(f"{prefix}: `scope.sections` doit etre une liste non vide.")
|
||||||
|
else:
|
||||||
|
invalid_sections = [value for value in sections if value not in VALID_SECTIONS]
|
||||||
|
if invalid_sections:
|
||||||
|
errors.append(f"{prefix}: sections invalides: {', '.join(invalid_sections)}.")
|
||||||
|
|
||||||
|
if not _is_non_empty_string(governance.get("owner")):
|
||||||
|
errors.append(f"{prefix}: `governance.owner` est obligatoire.")
|
||||||
|
if not _is_non_empty_string(governance.get("justification")):
|
||||||
|
errors.append(f"{prefix}: `governance.justification` est obligatoire.")
|
||||||
|
if not _is_non_empty_string(governance.get("created_at")):
|
||||||
|
errors.append(f"{prefix}: `governance.created_at` est obligatoire.")
|
||||||
|
|
||||||
|
tests = governance.get("tests")
|
||||||
|
if not isinstance(tests, dict):
|
||||||
|
errors.append(f"{prefix}: `governance.tests` doit etre un mapping.")
|
||||||
|
tests = {}
|
||||||
|
required_case_ids = tests.get("required_case_ids")
|
||||||
|
if not isinstance(required_case_ids, list) or not required_case_ids:
|
||||||
|
errors.append(f"{prefix}: `governance.tests.required_case_ids` doit etre une liste non vide.")
|
||||||
|
|
||||||
|
if rule_type == "exact_term":
|
||||||
|
if not _is_non_empty_string(match.get("exact_value")):
|
||||||
|
errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `exact_term`.")
|
||||||
|
|
||||||
|
if rule_type == "preserve_phrase":
|
||||||
|
if action != "preserve":
|
||||||
|
errors.append(f"{prefix}: `preserve_phrase` doit utiliser `action: preserve`.")
|
||||||
|
if not _is_non_empty_string(match.get("exact_value")):
|
||||||
|
errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `preserve_phrase`.")
|
||||||
|
|
||||||
|
if rule_type == "normalized_identifier":
|
||||||
|
if not _is_non_empty_string(match.get("canonical_value")):
|
||||||
|
errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `normalized_identifier`.")
|
||||||
|
|
||||||
|
if rule_type == "contextual_identifier":
|
||||||
|
if not _is_non_empty_string(match.get("canonical_value")):
|
||||||
|
errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `contextual_identifier`.")
|
||||||
|
context_prefixes = match.get("context_prefixes")
|
||||||
|
if not isinstance(context_prefixes, list) or not context_prefixes:
|
||||||
|
errors.append(f"{prefix}: `match.context_prefixes` doit etre une liste non vide.")
|
||||||
|
|
||||||
|
if status == "active" and governance.get("review_required_for_activation", False):
|
||||||
|
if not _is_non_empty_string(governance.get("approved_by")):
|
||||||
|
errors.append(f"{prefix}: `governance.approved_by` est obligatoire pour une regle active.")
|
||||||
|
|
||||||
|
return errors
|
||||||
|
|
||||||
|
|
||||||
|
def _placeholder_to_kind(placeholder: str) -> str:
|
||||||
|
if isinstance(placeholder, str) and placeholder.startswith("[") and placeholder.endswith("]"):
|
||||||
|
return placeholder[1:-1]
|
||||||
|
return "MASK"
|
||||||
|
|
||||||
|
|
||||||
|
def _literal_to_pattern(text: str, multiline: bool) -> str:
|
||||||
|
parts: list[str] = []
|
||||||
|
for char in text:
|
||||||
|
if char == " ":
|
||||||
|
parts.append(r"\s*" if multiline else r"[ \t]*")
|
||||||
|
elif char == "\n":
|
||||||
|
parts.append(r"\s*" if multiline else r"\n")
|
||||||
|
else:
|
||||||
|
parts.append(re.escape(char))
|
||||||
|
return "".join(parts)
|
||||||
|
|
||||||
|
|
||||||
|
def _compile_identifier_rule(rule: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
rule_type = rule.get("type")
|
||||||
|
normalization = rule.get("normalization") or {}
|
||||||
|
multiline = bool(normalization.get("multiline", False))
|
||||||
|
flags = re.IGNORECASE if normalization.get("case_insensitive", False) else 0
|
||||||
|
value = str((rule.get("match") or {}).get("canonical_value", "")).strip()
|
||||||
|
value_rx = re.escape(value)
|
||||||
|
boundary_before = r"(?<![A-Za-z0-9])"
|
||||||
|
boundary_after = r"(?![A-Za-z0-9])"
|
||||||
|
patterns = []
|
||||||
|
|
||||||
|
if rule_type == "normalized_identifier":
|
||||||
|
if normalization.get("allow_bare_value", False):
|
||||||
|
patterns.append(re.compile(rf"{boundary_before}({value_rx}){boundary_after}", flags | re.MULTILINE))
|
||||||
|
prefixes = normalization.get("accepted_prefixes") or []
|
||||||
|
separators = normalization.get("prefix_value_separators") or [" "]
|
||||||
|
else:
|
||||||
|
prefixes = (rule.get("match") or {}).get("context_prefixes") or []
|
||||||
|
separators = (rule.get("match") or {}).get("context_separators") or [": ", ":"]
|
||||||
|
|
||||||
|
gap = r"\s*" if multiline else r"[ \t]*"
|
||||||
|
for prefix in prefixes:
|
||||||
|
prefix_rx = _literal_to_pattern(str(prefix), multiline)
|
||||||
|
for separator in separators:
|
||||||
|
separator_rx = _literal_to_pattern(str(separator), multiline)
|
||||||
|
patterns.append(
|
||||||
|
re.compile(
|
||||||
|
rf"{boundary_before}{prefix_rx}{separator_rx}{gap}({value_rx}){boundary_after}",
|
||||||
|
flags | re.MULTILINE,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"id": rule.get("id"),
|
||||||
|
"type": rule_type,
|
||||||
|
"kind": _placeholder_to_kind(rule.get("placeholder", "[MASK]")),
|
||||||
|
"placeholder": rule.get("placeholder", "[MASK]"),
|
||||||
|
"patterns": patterns,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def compile_active_admin_rules(data: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
compiled = {
|
||||||
|
"force_mask_terms": [],
|
||||||
|
"whitelist_phrases": [],
|
||||||
|
"detection_rules": [],
|
||||||
|
"active_rule_ids": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
for rule in data.get("rules", []) or []:
|
||||||
|
if not isinstance(rule, dict):
|
||||||
|
continue
|
||||||
|
if rule.get("status") != "active":
|
||||||
|
continue
|
||||||
|
compiled["active_rule_ids"].append(rule.get("id"))
|
||||||
|
rule_type = rule.get("type")
|
||||||
|
action = rule.get("action")
|
||||||
|
match = rule.get("match") or {}
|
||||||
|
|
||||||
|
if rule_type == "exact_term" and action == "mask":
|
||||||
|
value = str(match.get("exact_value", "")).strip()
|
||||||
|
if value:
|
||||||
|
compiled["force_mask_terms"].append(value)
|
||||||
|
elif rule_type == "preserve_phrase" and action == "preserve":
|
||||||
|
value = str(match.get("exact_value", "")).strip()
|
||||||
|
if value:
|
||||||
|
compiled["whitelist_phrases"].append(value)
|
||||||
|
elif rule_type in {"normalized_identifier", "contextual_identifier"} and action == "mask":
|
||||||
|
if _is_non_empty_string(match.get("canonical_value")):
|
||||||
|
compiled["detection_rules"].append(_compile_identifier_rule(rule))
|
||||||
|
|
||||||
|
compiled["force_mask_terms"] = _dedupe_keep_order(compiled["force_mask_terms"])
|
||||||
|
compiled["whitelist_phrases"] = _dedupe_keep_order(compiled["whitelist_phrases"])
|
||||||
|
return compiled
|
||||||
@@ -49,6 +49,11 @@ from config_defaults import (
|
|||||||
load_effective_dictionaries_dict,
|
load_effective_dictionaries_dict,
|
||||||
load_default_dictionaries_dict,
|
load_default_dictionaries_dict,
|
||||||
)
|
)
|
||||||
|
from admin_rules import (
|
||||||
|
compile_active_admin_rules,
|
||||||
|
load_effective_admin_rules_dict,
|
||||||
|
validate_rules_config,
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from doctr.models import ocr_predictor as _doctr_ocr_predictor
|
from doctr.models import ocr_predictor as _doctr_ocr_predictor
|
||||||
@@ -842,6 +847,30 @@ def load_dictionaries(config_path: Optional[Path]) -> Dict[str, Any]:
|
|||||||
global _MEDICAL_STOP_WORDS_SET, _VILLE_BLACKLIST, _DPI_LABELS_SET, _COMPANION_BLACKLIST_SET
|
global _MEDICAL_STOP_WORDS_SET, _VILLE_BLACKLIST, _DPI_LABELS_SET, _COMPANION_BLACKLIST_SET
|
||||||
cfg = load_default_dictionaries_dict() if config_path is None else load_effective_dictionaries_dict(config_path)
|
cfg = load_default_dictionaries_dict() if config_path is None else load_effective_dictionaries_dict(config_path)
|
||||||
|
|
||||||
|
admin_rules_path = None if config_path is None else Path(config_path).with_name("admin_rules.yml")
|
||||||
|
admin_rules_cfg = load_effective_admin_rules_dict(admin_rules_path)
|
||||||
|
admin_rules_errors = validate_rules_config(admin_rules_cfg)
|
||||||
|
if admin_rules_errors:
|
||||||
|
log.warning("Configuration admin_rules invalide (%d erreur(s)); règles actives chargées en mode prudent.", len(admin_rules_errors))
|
||||||
|
for err in admin_rules_errors[:5]:
|
||||||
|
log.warning("admin_rules: %s", err)
|
||||||
|
compiled_admin_rules = compile_active_admin_rules(admin_rules_cfg)
|
||||||
|
|
||||||
|
blacklist = dict(cfg.get("blacklist", {}) or {})
|
||||||
|
force_mask_terms = list(blacklist.get("force_mask_terms", []) or [])
|
||||||
|
for term in compiled_admin_rules.get("force_mask_terms", []):
|
||||||
|
if term not in force_mask_terms:
|
||||||
|
force_mask_terms.append(term)
|
||||||
|
blacklist["force_mask_terms"] = force_mask_terms
|
||||||
|
cfg["blacklist"] = blacklist
|
||||||
|
|
||||||
|
whitelist_phrases = list(cfg.get("whitelist_phrases", []) or [])
|
||||||
|
for phrase in compiled_admin_rules.get("whitelist_phrases", []):
|
||||||
|
if phrase not in whitelist_phrases:
|
||||||
|
whitelist_phrases.append(phrase)
|
||||||
|
cfg["whitelist_phrases"] = whitelist_phrases
|
||||||
|
cfg["admin_rules_compiled"] = compiled_admin_rules
|
||||||
|
|
||||||
_MEDICAL_STOP_WORDS_SET = set(_BASE_MEDICAL_STOP_WORDS_SET)
|
_MEDICAL_STOP_WORDS_SET = set(_BASE_MEDICAL_STOP_WORDS_SET)
|
||||||
_VILLE_BLACKLIST = set(_BASE_VILLE_BLACKLIST)
|
_VILLE_BLACKLIST = set(_BASE_VILLE_BLACKLIST)
|
||||||
_DPI_LABELS_SET = set(_BASE_DPI_LABELS_SET)
|
_DPI_LABELS_SET = set(_BASE_DPI_LABELS_SET)
|
||||||
@@ -891,6 +920,29 @@ def load_dictionaries(config_path: Optional[Path]) -> Dict[str, Any]:
|
|||||||
|
|
||||||
return cfg
|
return cfg
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_admin_identifier_hits(full_raw: str, audit: List["PiiHit"], cfg: Dict[str, Any]) -> None:
|
||||||
|
compiled = (cfg.get("admin_rules_compiled") or {}).get("detection_rules", []) or []
|
||||||
|
seen: set[tuple[str, str]] = set()
|
||||||
|
for rule in compiled:
|
||||||
|
for pattern in rule.get("patterns", []) or []:
|
||||||
|
for match in pattern.finditer(full_raw):
|
||||||
|
value = (match.group(1) or "").strip()
|
||||||
|
if not value:
|
||||||
|
continue
|
||||||
|
dedupe_key = (str(rule.get("kind", "MASK")), value)
|
||||||
|
if dedupe_key in seen:
|
||||||
|
continue
|
||||||
|
seen.add(dedupe_key)
|
||||||
|
audit.append(
|
||||||
|
PiiHit(
|
||||||
|
-1,
|
||||||
|
str(rule.get("kind", "MASK")),
|
||||||
|
value,
|
||||||
|
str(rule.get("placeholder", PLACEHOLDERS["MASK"])),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# ----------------- Extraction -----------------
|
# ----------------- Extraction -----------------
|
||||||
|
|
||||||
_doctr_model_cache = None
|
_doctr_model_cache = None
|
||||||
@@ -2269,11 +2321,16 @@ def _apply_extracted_names(text: str, names: set, audit: List[PiiHit], force_nam
|
|||||||
return text
|
return text
|
||||||
|
|
||||||
|
|
||||||
def _apply_trackare_hits_to_text(text: str, audit: List[PiiHit]) -> str:
|
def _apply_trackare_hits_to_text(text: str, audit: List[PiiHit], cfg: Dict[str, Any] | None = None) -> str:
|
||||||
"""Applique les PiiHit non-NOM dans le texte (NDA, DOSSIER, EPISODE, RPPS, FINESS, etc.).
|
"""Applique les PiiHit non-NOM dans le texte (NDA, DOSSIER, EPISODE, RPPS, FINESS, etc.).
|
||||||
Ces hits sont détectés par _extract_trackare_identity ou la phase 0c
|
Ces hits sont détectés par _extract_trackare_identity ou la phase 0c
|
||||||
mais n'étaient appliqués qu'au PDF raster, pas au fichier .pseudonymise.txt."""
|
mais n'étaient appliqués qu'au PDF raster, pas au fichier .pseudonymise.txt."""
|
||||||
_APPLY_KINDS = {"DOSSIER", "EPISODE", "FINESS", "NDA", "RPPS"}
|
_APPLY_KINDS = {"DOSSIER", "EPISODE", "FINESS", "NDA", "RPPS"}
|
||||||
|
admin_rules = (cfg or {}).get("admin_rules_compiled") or {}
|
||||||
|
for rule in admin_rules.get("detection_rules", []) or []:
|
||||||
|
kind = rule.get("kind")
|
||||||
|
if kind:
|
||||||
|
_APPLY_KINDS.add(str(kind))
|
||||||
# Collecter les valeurs à remplacer, groupées par placeholder
|
# Collecter les valeurs à remplacer, groupées par placeholder
|
||||||
replacements: Dict[str, str] = {} # original → placeholder
|
replacements: Dict[str, str] = {} # original → placeholder
|
||||||
for h in audit:
|
for h in audit:
|
||||||
@@ -2416,6 +2473,9 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
|
|||||||
for m in _RE_VENUE_REVERSE.finditer(full_raw):
|
for m in _RE_VENUE_REVERSE.finditer(full_raw):
|
||||||
audit.append(PiiHit(-1, "NDA", m.group(1), PLACEHOLDERS["NDA"]))
|
audit.append(PiiHit(-1, "NDA", m.group(1), PLACEHOLDERS["NDA"]))
|
||||||
|
|
||||||
|
# Phase 0i : règles d'administration actives sur identifiants.
|
||||||
|
_apply_admin_identifier_hits(full_raw, audit, cfg)
|
||||||
|
|
||||||
# Phase 1 : masquage ligne par ligne (regex classiques)
|
# Phase 1 : masquage ligne par ligne (regex classiques)
|
||||||
out_pages: List[str] = []
|
out_pages: List[str] = []
|
||||||
for i, page_txt in enumerate(pages_text):
|
for i, page_txt in enumerate(pages_text):
|
||||||
@@ -2445,7 +2505,7 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
|
|||||||
text_out = _apply_extracted_names(text_out, all_names, audit, force_names=all_force_names)
|
text_out = _apply_extracted_names(text_out, all_names, audit, force_names=all_force_names)
|
||||||
|
|
||||||
# Phase 2b : application globale des PiiHit (EPISODE, RPPS, FINESS)
|
# Phase 2b : application globale des PiiHit (EPISODE, RPPS, FINESS)
|
||||||
text_out = _apply_trackare_hits_to_text(text_out, audit)
|
text_out = _apply_trackare_hits_to_text(text_out, audit, cfg)
|
||||||
|
|
||||||
return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit, is_trackare=is_trackare)
|
return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit, is_trackare=is_trackare)
|
||||||
|
|
||||||
|
|||||||
12
config/admin_rules.yml
Normal file
12
config/admin_rules.yml
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
# Surcharge locale optionnelle des règles d'administration.
|
||||||
|
# Les règles ci-dessous complètent ou modifient config/admin_rules.default.yml.
|
||||||
|
#
|
||||||
|
# Exemple pour activer localement une règle candidate :
|
||||||
|
# version: 1
|
||||||
|
# rules:
|
||||||
|
# - id: rule_identifier_1234567
|
||||||
|
# status: active
|
||||||
|
# governance:
|
||||||
|
# approved_by: responsable_qualite
|
||||||
|
version: 1
|
||||||
|
rules: []
|
||||||
141
tests/unit/test_admin_rules_integration.py
Normal file
141
tests/unit/test_admin_rules_integration.py
Normal file
@@ -0,0 +1,141 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Tests d'intégration des règles d'administration dans le moteur ONNX.
|
||||||
|
"""
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from anonymizer_core_refactored_onnx import (
|
||||||
|
anonymise_document_regex,
|
||||||
|
load_dictionaries,
|
||||||
|
selective_rescan,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_runtime_pair(tmp_path: Path, admin_rules_text: str) -> Path:
|
||||||
|
dict_path = tmp_path / "dictionnaires.yml"
|
||||||
|
dict_path.write_text("{}", encoding="utf-8")
|
||||||
|
(tmp_path / "admin_rules.yml").write_text(admin_rules_text, encoding="utf-8")
|
||||||
|
return dict_path
|
||||||
|
|
||||||
|
|
||||||
|
def test_runtime_exact_term_rule_masks_local_sigle(tmp_path: Path):
|
||||||
|
cfg_path = _write_runtime_pair(
|
||||||
|
tmp_path,
|
||||||
|
"""version: 1
|
||||||
|
rules:
|
||||||
|
- id: rule_local_sigle
|
||||||
|
label: Masquer LOCAL_SIGLE
|
||||||
|
type: exact_term
|
||||||
|
action: mask
|
||||||
|
placeholder: "[MASK]"
|
||||||
|
status: active
|
||||||
|
match:
|
||||||
|
exact_value: LOCAL_SIGLE
|
||||||
|
scope:
|
||||||
|
document_families: [all]
|
||||||
|
environments: [test]
|
||||||
|
sections: [narrative, structured]
|
||||||
|
governance:
|
||||||
|
owner: qualite
|
||||||
|
justification: Test d'integration local.
|
||||||
|
created_at: "2026-04-21"
|
||||||
|
review_required_for_activation: true
|
||||||
|
approved_by: responsable_qualite
|
||||||
|
tests:
|
||||||
|
required_case_ids: [007_overlay_force_mask_local]
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
cfg = load_dictionaries(cfg_path)
|
||||||
|
anon = anonymise_document_regex(["Réorientation vers LOCAL_SIGLE en urgence."], [[]], cfg)
|
||||||
|
text = selective_rescan(anon.text_out, cfg)
|
||||||
|
|
||||||
|
assert text == "Réorientation vers [MASK] en urgence."
|
||||||
|
assert any(hit.kind == "force_term" and hit.original == "LOCAL_SIGLE" for hit in anon.audit)
|
||||||
|
|
||||||
|
|
||||||
|
def test_runtime_normalized_identifier_masks_prefixed_and_bare_forms(tmp_path: Path):
|
||||||
|
cfg_path = _write_runtime_pair(
|
||||||
|
tmp_path,
|
||||||
|
"""version: 1
|
||||||
|
rules:
|
||||||
|
- id: rule_identifier_1234567
|
||||||
|
label: Identifier 1234567
|
||||||
|
type: normalized_identifier
|
||||||
|
action: mask
|
||||||
|
placeholder: "[NDA]"
|
||||||
|
status: active
|
||||||
|
match:
|
||||||
|
canonical_value: "1234567"
|
||||||
|
normalization:
|
||||||
|
case_insensitive: true
|
||||||
|
whole_word: true
|
||||||
|
multiline: true
|
||||||
|
allow_bare_value: true
|
||||||
|
accepted_prefixes: ["N°"]
|
||||||
|
prefix_value_separators: ["", " "]
|
||||||
|
scope:
|
||||||
|
document_families: [all]
|
||||||
|
environments: [test]
|
||||||
|
sections: [narrative, structured, table]
|
||||||
|
governance:
|
||||||
|
owner: qualite
|
||||||
|
justification: Test d'identifiant normalise.
|
||||||
|
created_at: "2026-04-21"
|
||||||
|
review_required_for_activation: true
|
||||||
|
approved_by: responsable_qualite
|
||||||
|
tests:
|
||||||
|
required_case_ids: [003_multiline_venue_number]
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
cfg = load_dictionaries(cfg_path)
|
||||||
|
anon = anonymise_document_regex(["N°1234567 puis N° 1234567 et 1234567"], [[]], cfg)
|
||||||
|
text = selective_rescan(anon.text_out, cfg)
|
||||||
|
|
||||||
|
assert text == "N°[NDA] puis N° [NDA] et [NDA]"
|
||||||
|
assert "1234567" not in text
|
||||||
|
assert any(hit.kind == "NDA" and hit.original == "1234567" for hit in anon.audit)
|
||||||
|
|
||||||
|
|
||||||
|
def test_runtime_contextual_identifier_masks_multiline_and_propagates_value(tmp_path: Path):
|
||||||
|
cfg_path = _write_runtime_pair(
|
||||||
|
tmp_path,
|
||||||
|
"""version: 1
|
||||||
|
rules:
|
||||||
|
- id: rule_context_ipp
|
||||||
|
label: IPP contextuel
|
||||||
|
type: contextual_identifier
|
||||||
|
action: mask
|
||||||
|
placeholder: "[IPP]"
|
||||||
|
status: active
|
||||||
|
match:
|
||||||
|
canonical_value: ABC12345
|
||||||
|
context_prefixes: ["IPP"]
|
||||||
|
context_separators: [":", " : ", "\\n"]
|
||||||
|
normalization:
|
||||||
|
case_insensitive: true
|
||||||
|
whole_word: true
|
||||||
|
multiline: true
|
||||||
|
scope:
|
||||||
|
document_families: [all]
|
||||||
|
environments: [test]
|
||||||
|
sections: [structured, narrative]
|
||||||
|
governance:
|
||||||
|
owner: qualite
|
||||||
|
justification: Test d'identifiant contextuel.
|
||||||
|
created_at: "2026-04-21"
|
||||||
|
review_required_for_activation: true
|
||||||
|
approved_by: responsable_qualite
|
||||||
|
tests:
|
||||||
|
required_case_ids: [004_structured_admin_complete]
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
cfg = load_dictionaries(cfg_path)
|
||||||
|
anon = anonymise_document_regex(["IPP\nABC12345\nRappel ABC12345"], [[]], cfg)
|
||||||
|
text = selective_rescan(anon.text_out, cfg)
|
||||||
|
|
||||||
|
assert text == "IPP\n[IPP]\nRappel [IPP]"
|
||||||
|
assert "ABC12345" not in text
|
||||||
|
assert any(hit.kind == "IPP" and hit.original == "ABC12345" for hit in anon.audit)
|
||||||
@@ -4,9 +4,9 @@ Tests de non-regression pour le contrat des regles d'administration.
|
|||||||
"""
|
"""
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from tools.validate_admin_rules import (
|
from admin_rules import (
|
||||||
generate_rule_variants,
|
generate_rule_variants,
|
||||||
load_rules_config,
|
load_effective_admin_rules_dict,
|
||||||
validate_rules_config,
|
validate_rules_config,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -14,7 +14,7 @@ from tools.validate_admin_rules import (
|
|||||||
def test_default_admin_rules_template_is_valid():
|
def test_default_admin_rules_template_is_valid():
|
||||||
path = Path("config/admin_rules.default.yml")
|
path = Path("config/admin_rules.default.yml")
|
||||||
|
|
||||||
data = load_rules_config(path)
|
data = load_effective_admin_rules_dict(path)
|
||||||
errors = validate_rules_config(data)
|
errors = validate_rules_config(data)
|
||||||
|
|
||||||
assert errors == []
|
assert errors == []
|
||||||
|
|||||||
@@ -1,221 +1,16 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""Validation semantique des regles d'administration."""
|
||||||
Validation semantique des regles d'administration.
|
|
||||||
"""
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
import sys
|
||||||
|
|
||||||
import yaml
|
ROOT = Path(__file__).resolve().parents[1]
|
||||||
|
if str(ROOT) not in sys.path:
|
||||||
|
sys.path.insert(0, str(ROOT))
|
||||||
|
|
||||||
|
from admin_rules import generate_rule_variants, load_effective_admin_rules_dict, validate_rules_config
|
||||||
VALID_TYPES = {
|
|
||||||
"exact_term",
|
|
||||||
"normalized_identifier",
|
|
||||||
"contextual_identifier",
|
|
||||||
"preserve_phrase",
|
|
||||||
}
|
|
||||||
VALID_ACTIONS = {"mask", "preserve"}
|
|
||||||
VALID_STATUSES = {"draft", "candidate", "approved", "active", "disabled", "retired"}
|
|
||||||
VALID_ENVIRONMENTS = {"test", "staging", "prod"}
|
|
||||||
VALID_SECTIONS = {"narrative", "structured", "table", "header", "footer"}
|
|
||||||
|
|
||||||
|
|
||||||
def load_rules_config(path: Path) -> dict[str, Any]:
|
|
||||||
with path.open("r", encoding="utf-8") as handle:
|
|
||||||
data = yaml.safe_load(handle) or {}
|
|
||||||
if not isinstance(data, dict):
|
|
||||||
raise ValueError("Le fichier doit contenir un mapping YAML en racine.")
|
|
||||||
return data
|
|
||||||
|
|
||||||
|
|
||||||
def _is_non_empty_string(value: Any) -> bool:
|
|
||||||
return isinstance(value, str) and bool(value.strip())
|
|
||||||
|
|
||||||
|
|
||||||
def generate_rule_variants(rule: dict[str, Any], limit: int = 12) -> list[str]:
|
|
||||||
rule_type = rule.get("type")
|
|
||||||
match = rule.get("match") or {}
|
|
||||||
normalization = rule.get("normalization") or {}
|
|
||||||
variants: list[str] = []
|
|
||||||
|
|
||||||
if rule_type in {"exact_term", "preserve_phrase"}:
|
|
||||||
exact_value = str(match.get("exact_value", "")).strip()
|
|
||||||
return [exact_value] if exact_value else []
|
|
||||||
|
|
||||||
if rule_type == "normalized_identifier":
|
|
||||||
canonical = str(match.get("canonical_value", "")).strip()
|
|
||||||
prefixes = normalization.get("accepted_prefixes") or []
|
|
||||||
separators = normalization.get("prefix_value_separators") or [" "]
|
|
||||||
if normalization.get("allow_bare_value", False) and canonical:
|
|
||||||
variants.append(canonical)
|
|
||||||
for prefix in prefixes:
|
|
||||||
for separator in separators:
|
|
||||||
variants.append(f"{prefix}{separator}{canonical}")
|
|
||||||
if normalization.get("multiline", False):
|
|
||||||
variants.append(f"{prefix}\n{canonical}")
|
|
||||||
return _dedupe_keep_order(variants)[:limit]
|
|
||||||
|
|
||||||
if rule_type == "contextual_identifier":
|
|
||||||
canonical = str(match.get("canonical_value", "")).strip()
|
|
||||||
prefixes = match.get("context_prefixes") or []
|
|
||||||
separators = match.get("context_separators") or [": ", ":"]
|
|
||||||
for prefix in prefixes:
|
|
||||||
for separator in separators:
|
|
||||||
variants.append(f"{prefix}{separator}{canonical}")
|
|
||||||
if (rule.get("normalization") or {}).get("multiline", False):
|
|
||||||
variants.append(f"{prefix}\n{canonical}")
|
|
||||||
variants.append(f"{prefix} :\n{canonical}")
|
|
||||||
return _dedupe_keep_order(variants)[:limit]
|
|
||||||
|
|
||||||
return []
|
|
||||||
|
|
||||||
|
|
||||||
def _dedupe_keep_order(values: list[str]) -> list[str]:
|
|
||||||
seen: set[str] = set()
|
|
||||||
output: list[str] = []
|
|
||||||
for value in values:
|
|
||||||
if value in seen:
|
|
||||||
continue
|
|
||||||
seen.add(value)
|
|
||||||
output.append(value)
|
|
||||||
return output
|
|
||||||
|
|
||||||
|
|
||||||
def validate_rules_config(data: dict[str, Any]) -> list[str]:
|
|
||||||
errors: list[str] = []
|
|
||||||
|
|
||||||
version = data.get("version")
|
|
||||||
if not isinstance(version, int) or version < 1:
|
|
||||||
errors.append("`version` doit etre un entier >= 1.")
|
|
||||||
|
|
||||||
rules = data.get("rules")
|
|
||||||
if not isinstance(rules, list):
|
|
||||||
errors.append("`rules` doit etre une liste.")
|
|
||||||
return errors
|
|
||||||
|
|
||||||
seen_ids: set[str] = set()
|
|
||||||
for index, rule in enumerate(rules):
|
|
||||||
prefix = f"rules[{index}]"
|
|
||||||
if not isinstance(rule, dict):
|
|
||||||
errors.append(f"{prefix}: chaque regle doit etre un mapping.")
|
|
||||||
continue
|
|
||||||
|
|
||||||
rule_id = rule.get("id")
|
|
||||||
if not _is_non_empty_string(rule_id):
|
|
||||||
errors.append(f"{prefix}: `id` est obligatoire.")
|
|
||||||
elif rule_id in seen_ids:
|
|
||||||
errors.append(f"{prefix}: `id` duplique `{rule_id}`.")
|
|
||||||
else:
|
|
||||||
seen_ids.add(rule_id)
|
|
||||||
|
|
||||||
if not _is_non_empty_string(rule.get("label")):
|
|
||||||
errors.append(f"{prefix}: `label` est obligatoire.")
|
|
||||||
|
|
||||||
rule_type = rule.get("type")
|
|
||||||
if rule_type not in VALID_TYPES:
|
|
||||||
errors.append(f"{prefix}: `type` invalide.")
|
|
||||||
|
|
||||||
action = rule.get("action")
|
|
||||||
if action not in VALID_ACTIONS:
|
|
||||||
errors.append(f"{prefix}: `action` invalide.")
|
|
||||||
|
|
||||||
status = rule.get("status")
|
|
||||||
if status not in VALID_STATUSES:
|
|
||||||
errors.append(f"{prefix}: `status` invalide.")
|
|
||||||
|
|
||||||
if action == "mask" and not _is_non_empty_string(rule.get("placeholder")):
|
|
||||||
errors.append(f"{prefix}: `placeholder` est obligatoire pour une regle de masquage.")
|
|
||||||
|
|
||||||
match = rule.get("match")
|
|
||||||
if not isinstance(match, dict):
|
|
||||||
errors.append(f"{prefix}: `match` doit etre un mapping.")
|
|
||||||
match = {}
|
|
||||||
|
|
||||||
normalization = rule.get("normalization") or {}
|
|
||||||
if normalization and not isinstance(normalization, dict):
|
|
||||||
errors.append(f"{prefix}: `normalization` doit etre un mapping.")
|
|
||||||
normalization = {}
|
|
||||||
|
|
||||||
scope = rule.get("scope")
|
|
||||||
if not isinstance(scope, dict):
|
|
||||||
errors.append(f"{prefix}: `scope` doit etre un mapping.")
|
|
||||||
scope = {}
|
|
||||||
|
|
||||||
governance = rule.get("governance")
|
|
||||||
if not isinstance(governance, dict):
|
|
||||||
errors.append(f"{prefix}: `governance` doit etre un mapping.")
|
|
||||||
governance = {}
|
|
||||||
|
|
||||||
document_families = scope.get("document_families")
|
|
||||||
if not isinstance(document_families, list) or not document_families:
|
|
||||||
errors.append(f"{prefix}: `scope.document_families` doit etre une liste non vide.")
|
|
||||||
|
|
||||||
environments = scope.get("environments")
|
|
||||||
if not isinstance(environments, list) or not environments:
|
|
||||||
errors.append(f"{prefix}: `scope.environments` doit etre une liste non vide.")
|
|
||||||
else:
|
|
||||||
invalid_envs = [value for value in environments if value not in VALID_ENVIRONMENTS]
|
|
||||||
if invalid_envs:
|
|
||||||
errors.append(f"{prefix}: environnements invalides: {', '.join(invalid_envs)}.")
|
|
||||||
|
|
||||||
sections = scope.get("sections")
|
|
||||||
if not isinstance(sections, list) or not sections:
|
|
||||||
errors.append(f"{prefix}: `scope.sections` doit etre une liste non vide.")
|
|
||||||
else:
|
|
||||||
invalid_sections = [value for value in sections if value not in VALID_SECTIONS]
|
|
||||||
if invalid_sections:
|
|
||||||
errors.append(f"{prefix}: sections invalides: {', '.join(invalid_sections)}.")
|
|
||||||
|
|
||||||
if not _is_non_empty_string(governance.get("owner")):
|
|
||||||
errors.append(f"{prefix}: `governance.owner` est obligatoire.")
|
|
||||||
if not _is_non_empty_string(governance.get("justification")):
|
|
||||||
errors.append(f"{prefix}: `governance.justification` est obligatoire.")
|
|
||||||
if not _is_non_empty_string(governance.get("created_at")):
|
|
||||||
errors.append(f"{prefix}: `governance.created_at` est obligatoire.")
|
|
||||||
|
|
||||||
tests = governance.get("tests")
|
|
||||||
if not isinstance(tests, dict):
|
|
||||||
errors.append(f"{prefix}: `governance.tests` doit etre un mapping.")
|
|
||||||
tests = {}
|
|
||||||
required_case_ids = tests.get("required_case_ids")
|
|
||||||
if not isinstance(required_case_ids, list) or not required_case_ids:
|
|
||||||
errors.append(f"{prefix}: `governance.tests.required_case_ids` doit etre une liste non vide.")
|
|
||||||
|
|
||||||
if rule_type == "exact_term":
|
|
||||||
if not _is_non_empty_string(match.get("exact_value")):
|
|
||||||
errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `exact_term`.")
|
|
||||||
|
|
||||||
if rule_type == "preserve_phrase":
|
|
||||||
if action != "preserve":
|
|
||||||
errors.append(f"{prefix}: `preserve_phrase` doit utiliser `action: preserve`.")
|
|
||||||
if not _is_non_empty_string(match.get("exact_value")):
|
|
||||||
errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `preserve_phrase`.")
|
|
||||||
|
|
||||||
if rule_type == "normalized_identifier":
|
|
||||||
if not _is_non_empty_string(match.get("canonical_value")):
|
|
||||||
errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `normalized_identifier`.")
|
|
||||||
prefixes = normalization.get("accepted_prefixes", [])
|
|
||||||
if prefixes and not isinstance(prefixes, list):
|
|
||||||
errors.append(f"{prefix}: `normalization.accepted_prefixes` doit etre une liste.")
|
|
||||||
separators = normalization.get("prefix_value_separators", [])
|
|
||||||
if separators and not isinstance(separators, list):
|
|
||||||
errors.append(f"{prefix}: `normalization.prefix_value_separators` doit etre une liste.")
|
|
||||||
|
|
||||||
if rule_type == "contextual_identifier":
|
|
||||||
if not _is_non_empty_string(match.get("canonical_value")):
|
|
||||||
errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `contextual_identifier`.")
|
|
||||||
context_prefixes = match.get("context_prefixes")
|
|
||||||
if not isinstance(context_prefixes, list) or not context_prefixes:
|
|
||||||
errors.append(f"{prefix}: `match.context_prefixes` doit etre une liste non vide.")
|
|
||||||
|
|
||||||
if status == "active" and governance.get("review_required_for_activation", False):
|
|
||||||
if not _is_non_empty_string(governance.get("approved_by")):
|
|
||||||
errors.append(f"{prefix}: `governance.approved_by` est obligatoire pour une regle active.")
|
|
||||||
|
|
||||||
return errors
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> int:
|
def main() -> int:
|
||||||
@@ -233,7 +28,7 @@ def main() -> int:
|
|||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
config_path = Path(args.config)
|
config_path = Path(args.config)
|
||||||
data = load_rules_config(config_path)
|
data = load_effective_admin_rules_dict(config_path)
|
||||||
errors = validate_rules_config(data)
|
errors = validate_rules_config(data)
|
||||||
|
|
||||||
if errors:
|
if errors:
|
||||||
|
|||||||
Reference in New Issue
Block a user