From bc24a21fea8db13dcdb96b3f06a5d78050608f8c Mon Sep 17 00:00:00 2001 From: Domi31tls Date: Tue, 21 Apr 2026 12:10:17 +0200 Subject: [PATCH] Wire admin rules into ONNX anonymizer --- admin_rules.py | 406 +++++++++++++++++++++ anonymizer_core_refactored_onnx.py | 64 +++- config/admin_rules.yml | 12 + tests/unit/test_admin_rules_integration.py | 141 +++++++ tests/unit/test_admin_rules_validator.py | 6 +- tools/validate_admin_rules.py | 219 +---------- 6 files changed, 631 insertions(+), 217 deletions(-) create mode 100644 admin_rules.py create mode 100644 config/admin_rules.yml create mode 100644 tests/unit/test_admin_rules_integration.py diff --git a/admin_rules.py b/admin_rules.py new file mode 100644 index 0000000..33e23df --- /dev/null +++ b/admin_rules.py @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 +""" +Helpers partagés pour les règles d'administration. +""" +from __future__ import annotations + +from copy import deepcopy +from pathlib import Path +from typing import Any +import re + +try: + import yaml +except Exception: + yaml = None + +from config_defaults import CONFIG_DIR, deep_merge_dict + + +DEFAULT_ADMIN_RULES_CONFIG_PATH = CONFIG_DIR / "admin_rules.default.yml" +RUNTIME_ADMIN_RULES_CONFIG_PATH = CONFIG_DIR / "admin_rules.yml" + +_RUNTIME_ADMIN_RULES_OVERLAY_TEXT = """# Surcharge locale des règles d'administration. +# Ce fichier est optionnel. Les règles actives de config/admin_rules.default.yml +# restent valides tant qu'aucune surcharge locale n'est définie ici. +# +# Exemple : +# version: 1 +# rules: +# - id: rule_identifier_1234567 +# status: active +# governance: +# approved_by: responsable_qualite +version: 1 +rules: [] +""" + +_FALLBACK_DEFAULT_ADMIN_RULES_DICT: dict[str, Any] = { + "version": 1, + "rules": [], +} + + +def _is_non_empty_string(value: Any) -> bool: + return isinstance(value, str) and bool(value.strip()) + + +def read_default_admin_rules_text() -> str: + try: + return DEFAULT_ADMIN_RULES_CONFIG_PATH.read_text(encoding="utf-8") + except Exception: + return "version: 1\nrules: []\n" + + +def read_runtime_admin_rules_overlay_text() -> str: + return _RUNTIME_ADMIN_RULES_OVERLAY_TEXT + + +def load_default_admin_rules_dict() -> dict[str, Any]: + if yaml is None: + return deepcopy(_FALLBACK_DEFAULT_ADMIN_RULES_DICT) + try: + loaded = yaml.safe_load(read_default_admin_rules_text()) or {} + if isinstance(loaded, dict): + return loaded + except Exception: + pass + return deepcopy(_FALLBACK_DEFAULT_ADMIN_RULES_DICT) + + +def load_runtime_admin_rules_overlay_dict(path: Path | None = None) -> dict[str, Any]: + target = Path(path) if path is not None else RUNTIME_ADMIN_RULES_CONFIG_PATH + if not target.exists() or yaml is None: + return {} + try: + loaded = yaml.safe_load(target.read_text(encoding="utf-8")) or {} + if isinstance(loaded, dict): + return loaded + except Exception: + pass + return {} + + +def _merge_rules_by_id(base_rules: list[dict[str, Any]], overlay_rules: list[dict[str, Any]]) -> list[dict[str, Any]]: + merged: list[dict[str, Any]] = [deepcopy(rule) for rule in base_rules] + index_by_id = { + rule.get("id"): idx + for idx, rule in enumerate(merged) + if isinstance(rule, dict) and _is_non_empty_string(rule.get("id")) + } + for overlay_rule in overlay_rules: + if not isinstance(overlay_rule, dict): + continue + rule_id = overlay_rule.get("id") + if _is_non_empty_string(rule_id) and rule_id in index_by_id: + idx = index_by_id[rule_id] + merged[idx] = deep_merge_dict(merged[idx], overlay_rule) + else: + merged.append(deepcopy(overlay_rule)) + if _is_non_empty_string(rule_id): + index_by_id[rule_id] = len(merged) - 1 + return merged + + +def merge_admin_rules_dict(base: dict[str, Any], overlay: dict[str, Any]) -> dict[str, Any]: + merged = deep_merge_dict(base, {k: v for k, v in overlay.items() if k != "rules"}) + merged["rules"] = _merge_rules_by_id(base.get("rules", []) or [], overlay.get("rules", []) or []) + return merged + + +def load_effective_admin_rules_dict(path: Path | None = None) -> dict[str, Any]: + return merge_admin_rules_dict( + load_default_admin_rules_dict(), + load_runtime_admin_rules_overlay_dict(path), + ) + + +def ensure_runtime_admin_rules_config(path: Path | None = None) -> Path: + target = Path(path) if path is not None else RUNTIME_ADMIN_RULES_CONFIG_PATH + if not target.exists(): + target.parent.mkdir(parents=True, exist_ok=True) + target.write_text(read_runtime_admin_rules_overlay_text(), encoding="utf-8") + return target + + +def _dedupe_keep_order(values: list[str]) -> list[str]: + seen: set[str] = set() + output: list[str] = [] + for value in values: + if value in seen: + continue + seen.add(value) + output.append(value) + return output + + +def generate_rule_variants(rule: dict[str, Any], limit: int = 12) -> list[str]: + rule_type = rule.get("type") + match = rule.get("match") or {} + normalization = rule.get("normalization") or {} + variants: list[str] = [] + + if rule_type in {"exact_term", "preserve_phrase"}: + exact_value = str(match.get("exact_value", "")).strip() + return [exact_value] if exact_value else [] + + if rule_type == "normalized_identifier": + canonical = str(match.get("canonical_value", "")).strip() + prefixes = normalization.get("accepted_prefixes") or [] + separators = normalization.get("prefix_value_separators") or [" "] + if normalization.get("allow_bare_value", False) and canonical: + variants.append(canonical) + for prefix in prefixes: + for separator in separators: + variants.append(f"{prefix}{separator}{canonical}") + if normalization.get("multiline", False): + variants.append(f"{prefix}\n{canonical}") + return _dedupe_keep_order(variants)[:limit] + + if rule_type == "contextual_identifier": + canonical = str(match.get("canonical_value", "")).strip() + prefixes = match.get("context_prefixes") or [] + separators = match.get("context_separators") or [": ", ":"] + for prefix in prefixes: + for separator in separators: + variants.append(f"{prefix}{separator}{canonical}") + if (rule.get("normalization") or {}).get("multiline", False): + variants.append(f"{prefix}\n{canonical}") + variants.append(f"{prefix} :\n{canonical}") + return _dedupe_keep_order(variants)[:limit] + + return [] + + +VALID_TYPES = { + "exact_term", + "normalized_identifier", + "contextual_identifier", + "preserve_phrase", +} +VALID_ACTIONS = {"mask", "preserve"} +VALID_STATUSES = {"draft", "candidate", "approved", "active", "disabled", "retired"} +VALID_ENVIRONMENTS = {"test", "staging", "prod"} +VALID_SECTIONS = {"narrative", "structured", "table", "header", "footer"} + + +def validate_rules_config(data: dict[str, Any]) -> list[str]: + errors: list[str] = [] + + version = data.get("version") + if not isinstance(version, int) or version < 1: + errors.append("`version` doit etre un entier >= 1.") + + rules = data.get("rules") + if not isinstance(rules, list): + errors.append("`rules` doit etre une liste.") + return errors + + seen_ids: set[str] = set() + for index, rule in enumerate(rules): + prefix = f"rules[{index}]" + if not isinstance(rule, dict): + errors.append(f"{prefix}: chaque regle doit etre un mapping.") + continue + + rule_id = rule.get("id") + if not _is_non_empty_string(rule_id): + errors.append(f"{prefix}: `id` est obligatoire.") + elif rule_id in seen_ids: + errors.append(f"{prefix}: `id` duplique `{rule_id}`.") + else: + seen_ids.add(rule_id) + + if not _is_non_empty_string(rule.get("label")): + errors.append(f"{prefix}: `label` est obligatoire.") + + rule_type = rule.get("type") + if rule_type not in VALID_TYPES: + errors.append(f"{prefix}: `type` invalide.") + + action = rule.get("action") + if action not in VALID_ACTIONS: + errors.append(f"{prefix}: `action` invalide.") + + status = rule.get("status") + if status not in VALID_STATUSES: + errors.append(f"{prefix}: `status` invalide.") + + if action == "mask" and not _is_non_empty_string(rule.get("placeholder")): + errors.append(f"{prefix}: `placeholder` est obligatoire pour une regle de masquage.") + + match = rule.get("match") + if not isinstance(match, dict): + errors.append(f"{prefix}: `match` doit etre un mapping.") + match = {} + + normalization = rule.get("normalization") or {} + if normalization and not isinstance(normalization, dict): + errors.append(f"{prefix}: `normalization` doit etre un mapping.") + normalization = {} + + scope = rule.get("scope") + if not isinstance(scope, dict): + errors.append(f"{prefix}: `scope` doit etre un mapping.") + scope = {} + + governance = rule.get("governance") + if not isinstance(governance, dict): + errors.append(f"{prefix}: `governance` doit etre un mapping.") + governance = {} + + document_families = scope.get("document_families") + if not isinstance(document_families, list) or not document_families: + errors.append(f"{prefix}: `scope.document_families` doit etre une liste non vide.") + + environments = scope.get("environments") + if not isinstance(environments, list) or not environments: + errors.append(f"{prefix}: `scope.environments` doit etre une liste non vide.") + else: + invalid_envs = [value for value in environments if value not in VALID_ENVIRONMENTS] + if invalid_envs: + errors.append(f"{prefix}: environnements invalides: {', '.join(invalid_envs)}.") + + sections = scope.get("sections") + if not isinstance(sections, list) or not sections: + errors.append(f"{prefix}: `scope.sections` doit etre une liste non vide.") + else: + invalid_sections = [value for value in sections if value not in VALID_SECTIONS] + if invalid_sections: + errors.append(f"{prefix}: sections invalides: {', '.join(invalid_sections)}.") + + if not _is_non_empty_string(governance.get("owner")): + errors.append(f"{prefix}: `governance.owner` est obligatoire.") + if not _is_non_empty_string(governance.get("justification")): + errors.append(f"{prefix}: `governance.justification` est obligatoire.") + if not _is_non_empty_string(governance.get("created_at")): + errors.append(f"{prefix}: `governance.created_at` est obligatoire.") + + tests = governance.get("tests") + if not isinstance(tests, dict): + errors.append(f"{prefix}: `governance.tests` doit etre un mapping.") + tests = {} + required_case_ids = tests.get("required_case_ids") + if not isinstance(required_case_ids, list) or not required_case_ids: + errors.append(f"{prefix}: `governance.tests.required_case_ids` doit etre une liste non vide.") + + if rule_type == "exact_term": + if not _is_non_empty_string(match.get("exact_value")): + errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `exact_term`.") + + if rule_type == "preserve_phrase": + if action != "preserve": + errors.append(f"{prefix}: `preserve_phrase` doit utiliser `action: preserve`.") + if not _is_non_empty_string(match.get("exact_value")): + errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `preserve_phrase`.") + + if rule_type == "normalized_identifier": + if not _is_non_empty_string(match.get("canonical_value")): + errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `normalized_identifier`.") + + if rule_type == "contextual_identifier": + if not _is_non_empty_string(match.get("canonical_value")): + errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `contextual_identifier`.") + context_prefixes = match.get("context_prefixes") + if not isinstance(context_prefixes, list) or not context_prefixes: + errors.append(f"{prefix}: `match.context_prefixes` doit etre une liste non vide.") + + if status == "active" and governance.get("review_required_for_activation", False): + if not _is_non_empty_string(governance.get("approved_by")): + errors.append(f"{prefix}: `governance.approved_by` est obligatoire pour une regle active.") + + return errors + + +def _placeholder_to_kind(placeholder: str) -> str: + if isinstance(placeholder, str) and placeholder.startswith("[") and placeholder.endswith("]"): + return placeholder[1:-1] + return "MASK" + + +def _literal_to_pattern(text: str, multiline: bool) -> str: + parts: list[str] = [] + for char in text: + if char == " ": + parts.append(r"\s*" if multiline else r"[ \t]*") + elif char == "\n": + parts.append(r"\s*" if multiline else r"\n") + else: + parts.append(re.escape(char)) + return "".join(parts) + + +def _compile_identifier_rule(rule: dict[str, Any]) -> dict[str, Any]: + rule_type = rule.get("type") + normalization = rule.get("normalization") or {} + multiline = bool(normalization.get("multiline", False)) + flags = re.IGNORECASE if normalization.get("case_insensitive", False) else 0 + value = str((rule.get("match") or {}).get("canonical_value", "")).strip() + value_rx = re.escape(value) + boundary_before = r"(? dict[str, Any]: + compiled = { + "force_mask_terms": [], + "whitelist_phrases": [], + "detection_rules": [], + "active_rule_ids": [], + } + + for rule in data.get("rules", []) or []: + if not isinstance(rule, dict): + continue + if rule.get("status") != "active": + continue + compiled["active_rule_ids"].append(rule.get("id")) + rule_type = rule.get("type") + action = rule.get("action") + match = rule.get("match") or {} + + if rule_type == "exact_term" and action == "mask": + value = str(match.get("exact_value", "")).strip() + if value: + compiled["force_mask_terms"].append(value) + elif rule_type == "preserve_phrase" and action == "preserve": + value = str(match.get("exact_value", "")).strip() + if value: + compiled["whitelist_phrases"].append(value) + elif rule_type in {"normalized_identifier", "contextual_identifier"} and action == "mask": + if _is_non_empty_string(match.get("canonical_value")): + compiled["detection_rules"].append(_compile_identifier_rule(rule)) + + compiled["force_mask_terms"] = _dedupe_keep_order(compiled["force_mask_terms"]) + compiled["whitelist_phrases"] = _dedupe_keep_order(compiled["whitelist_phrases"]) + return compiled diff --git a/anonymizer_core_refactored_onnx.py b/anonymizer_core_refactored_onnx.py index 1629b59..ab70195 100644 --- a/anonymizer_core_refactored_onnx.py +++ b/anonymizer_core_refactored_onnx.py @@ -49,6 +49,11 @@ from config_defaults import ( load_effective_dictionaries_dict, load_default_dictionaries_dict, ) +from admin_rules import ( + compile_active_admin_rules, + load_effective_admin_rules_dict, + validate_rules_config, +) try: from doctr.models import ocr_predictor as _doctr_ocr_predictor @@ -842,6 +847,30 @@ def load_dictionaries(config_path: Optional[Path]) -> Dict[str, Any]: global _MEDICAL_STOP_WORDS_SET, _VILLE_BLACKLIST, _DPI_LABELS_SET, _COMPANION_BLACKLIST_SET cfg = load_default_dictionaries_dict() if config_path is None else load_effective_dictionaries_dict(config_path) + admin_rules_path = None if config_path is None else Path(config_path).with_name("admin_rules.yml") + admin_rules_cfg = load_effective_admin_rules_dict(admin_rules_path) + admin_rules_errors = validate_rules_config(admin_rules_cfg) + if admin_rules_errors: + log.warning("Configuration admin_rules invalide (%d erreur(s)); règles actives chargées en mode prudent.", len(admin_rules_errors)) + for err in admin_rules_errors[:5]: + log.warning("admin_rules: %s", err) + compiled_admin_rules = compile_active_admin_rules(admin_rules_cfg) + + blacklist = dict(cfg.get("blacklist", {}) or {}) + force_mask_terms = list(blacklist.get("force_mask_terms", []) or []) + for term in compiled_admin_rules.get("force_mask_terms", []): + if term not in force_mask_terms: + force_mask_terms.append(term) + blacklist["force_mask_terms"] = force_mask_terms + cfg["blacklist"] = blacklist + + whitelist_phrases = list(cfg.get("whitelist_phrases", []) or []) + for phrase in compiled_admin_rules.get("whitelist_phrases", []): + if phrase not in whitelist_phrases: + whitelist_phrases.append(phrase) + cfg["whitelist_phrases"] = whitelist_phrases + cfg["admin_rules_compiled"] = compiled_admin_rules + _MEDICAL_STOP_WORDS_SET = set(_BASE_MEDICAL_STOP_WORDS_SET) _VILLE_BLACKLIST = set(_BASE_VILLE_BLACKLIST) _DPI_LABELS_SET = set(_BASE_DPI_LABELS_SET) @@ -891,6 +920,29 @@ def load_dictionaries(config_path: Optional[Path]) -> Dict[str, Any]: return cfg + +def _apply_admin_identifier_hits(full_raw: str, audit: List["PiiHit"], cfg: Dict[str, Any]) -> None: + compiled = (cfg.get("admin_rules_compiled") or {}).get("detection_rules", []) or [] + seen: set[tuple[str, str]] = set() + for rule in compiled: + for pattern in rule.get("patterns", []) or []: + for match in pattern.finditer(full_raw): + value = (match.group(1) or "").strip() + if not value: + continue + dedupe_key = (str(rule.get("kind", "MASK")), value) + if dedupe_key in seen: + continue + seen.add(dedupe_key) + audit.append( + PiiHit( + -1, + str(rule.get("kind", "MASK")), + value, + str(rule.get("placeholder", PLACEHOLDERS["MASK"])), + ) + ) + # ----------------- Extraction ----------------- _doctr_model_cache = None @@ -2269,11 +2321,16 @@ def _apply_extracted_names(text: str, names: set, audit: List[PiiHit], force_nam return text -def _apply_trackare_hits_to_text(text: str, audit: List[PiiHit]) -> str: +def _apply_trackare_hits_to_text(text: str, audit: List[PiiHit], cfg: Dict[str, Any] | None = None) -> str: """Applique les PiiHit non-NOM dans le texte (NDA, DOSSIER, EPISODE, RPPS, FINESS, etc.). Ces hits sont détectés par _extract_trackare_identity ou la phase 0c mais n'étaient appliqués qu'au PDF raster, pas au fichier .pseudonymise.txt.""" _APPLY_KINDS = {"DOSSIER", "EPISODE", "FINESS", "NDA", "RPPS"} + admin_rules = (cfg or {}).get("admin_rules_compiled") or {} + for rule in admin_rules.get("detection_rules", []) or []: + kind = rule.get("kind") + if kind: + _APPLY_KINDS.add(str(kind)) # Collecter les valeurs à remplacer, groupées par placeholder replacements: Dict[str, str] = {} # original → placeholder for h in audit: @@ -2416,6 +2473,9 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str] for m in _RE_VENUE_REVERSE.finditer(full_raw): audit.append(PiiHit(-1, "NDA", m.group(1), PLACEHOLDERS["NDA"])) + # Phase 0i : règles d'administration actives sur identifiants. + _apply_admin_identifier_hits(full_raw, audit, cfg) + # Phase 1 : masquage ligne par ligne (regex classiques) out_pages: List[str] = [] for i, page_txt in enumerate(pages_text): @@ -2445,7 +2505,7 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str] text_out = _apply_extracted_names(text_out, all_names, audit, force_names=all_force_names) # Phase 2b : application globale des PiiHit (EPISODE, RPPS, FINESS) - text_out = _apply_trackare_hits_to_text(text_out, audit) + text_out = _apply_trackare_hits_to_text(text_out, audit, cfg) return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit, is_trackare=is_trackare) diff --git a/config/admin_rules.yml b/config/admin_rules.yml new file mode 100644 index 0000000..9b89d9e --- /dev/null +++ b/config/admin_rules.yml @@ -0,0 +1,12 @@ +# Surcharge locale optionnelle des règles d'administration. +# Les règles ci-dessous complètent ou modifient config/admin_rules.default.yml. +# +# Exemple pour activer localement une règle candidate : +# version: 1 +# rules: +# - id: rule_identifier_1234567 +# status: active +# governance: +# approved_by: responsable_qualite +version: 1 +rules: [] diff --git a/tests/unit/test_admin_rules_integration.py b/tests/unit/test_admin_rules_integration.py new file mode 100644 index 0000000..bc7682c --- /dev/null +++ b/tests/unit/test_admin_rules_integration.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +""" +Tests d'intégration des règles d'administration dans le moteur ONNX. +""" +from pathlib import Path + +from anonymizer_core_refactored_onnx import ( + anonymise_document_regex, + load_dictionaries, + selective_rescan, +) + + +def _write_runtime_pair(tmp_path: Path, admin_rules_text: str) -> Path: + dict_path = tmp_path / "dictionnaires.yml" + dict_path.write_text("{}", encoding="utf-8") + (tmp_path / "admin_rules.yml").write_text(admin_rules_text, encoding="utf-8") + return dict_path + + +def test_runtime_exact_term_rule_masks_local_sigle(tmp_path: Path): + cfg_path = _write_runtime_pair( + tmp_path, + """version: 1 +rules: + - id: rule_local_sigle + label: Masquer LOCAL_SIGLE + type: exact_term + action: mask + placeholder: "[MASK]" + status: active + match: + exact_value: LOCAL_SIGLE + scope: + document_families: [all] + environments: [test] + sections: [narrative, structured] + governance: + owner: qualite + justification: Test d'integration local. + created_at: "2026-04-21" + review_required_for_activation: true + approved_by: responsable_qualite + tests: + required_case_ids: [007_overlay_force_mask_local] +""", + ) + + cfg = load_dictionaries(cfg_path) + anon = anonymise_document_regex(["Réorientation vers LOCAL_SIGLE en urgence."], [[]], cfg) + text = selective_rescan(anon.text_out, cfg) + + assert text == "Réorientation vers [MASK] en urgence." + assert any(hit.kind == "force_term" and hit.original == "LOCAL_SIGLE" for hit in anon.audit) + + +def test_runtime_normalized_identifier_masks_prefixed_and_bare_forms(tmp_path: Path): + cfg_path = _write_runtime_pair( + tmp_path, + """version: 1 +rules: + - id: rule_identifier_1234567 + label: Identifier 1234567 + type: normalized_identifier + action: mask + placeholder: "[NDA]" + status: active + match: + canonical_value: "1234567" + normalization: + case_insensitive: true + whole_word: true + multiline: true + allow_bare_value: true + accepted_prefixes: ["N°"] + prefix_value_separators: ["", " "] + scope: + document_families: [all] + environments: [test] + sections: [narrative, structured, table] + governance: + owner: qualite + justification: Test d'identifiant normalise. + created_at: "2026-04-21" + review_required_for_activation: true + approved_by: responsable_qualite + tests: + required_case_ids: [003_multiline_venue_number] +""", + ) + + cfg = load_dictionaries(cfg_path) + anon = anonymise_document_regex(["N°1234567 puis N° 1234567 et 1234567"], [[]], cfg) + text = selective_rescan(anon.text_out, cfg) + + assert text == "N°[NDA] puis N° [NDA] et [NDA]" + assert "1234567" not in text + assert any(hit.kind == "NDA" and hit.original == "1234567" for hit in anon.audit) + + +def test_runtime_contextual_identifier_masks_multiline_and_propagates_value(tmp_path: Path): + cfg_path = _write_runtime_pair( + tmp_path, + """version: 1 +rules: + - id: rule_context_ipp + label: IPP contextuel + type: contextual_identifier + action: mask + placeholder: "[IPP]" + status: active + match: + canonical_value: ABC12345 + context_prefixes: ["IPP"] + context_separators: [":", " : ", "\\n"] + normalization: + case_insensitive: true + whole_word: true + multiline: true + scope: + document_families: [all] + environments: [test] + sections: [structured, narrative] + governance: + owner: qualite + justification: Test d'identifiant contextuel. + created_at: "2026-04-21" + review_required_for_activation: true + approved_by: responsable_qualite + tests: + required_case_ids: [004_structured_admin_complete] +""", + ) + + cfg = load_dictionaries(cfg_path) + anon = anonymise_document_regex(["IPP\nABC12345\nRappel ABC12345"], [[]], cfg) + text = selective_rescan(anon.text_out, cfg) + + assert text == "IPP\n[IPP]\nRappel [IPP]" + assert "ABC12345" not in text + assert any(hit.kind == "IPP" and hit.original == "ABC12345" for hit in anon.audit) diff --git a/tests/unit/test_admin_rules_validator.py b/tests/unit/test_admin_rules_validator.py index 9645557..8407513 100644 --- a/tests/unit/test_admin_rules_validator.py +++ b/tests/unit/test_admin_rules_validator.py @@ -4,9 +4,9 @@ Tests de non-regression pour le contrat des regles d'administration. """ from pathlib import Path -from tools.validate_admin_rules import ( +from admin_rules import ( generate_rule_variants, - load_rules_config, + load_effective_admin_rules_dict, validate_rules_config, ) @@ -14,7 +14,7 @@ from tools.validate_admin_rules import ( def test_default_admin_rules_template_is_valid(): path = Path("config/admin_rules.default.yml") - data = load_rules_config(path) + data = load_effective_admin_rules_dict(path) errors = validate_rules_config(data) assert errors == [] diff --git a/tools/validate_admin_rules.py b/tools/validate_admin_rules.py index 9b73cd5..a88e5d6 100644 --- a/tools/validate_admin_rules.py +++ b/tools/validate_admin_rules.py @@ -1,221 +1,16 @@ #!/usr/bin/env python3 -""" -Validation semantique des regles d'administration. -""" +"""Validation semantique des regles d'administration.""" from __future__ import annotations import argparse from pathlib import Path -from typing import Any +import sys -import yaml +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) - -VALID_TYPES = { - "exact_term", - "normalized_identifier", - "contextual_identifier", - "preserve_phrase", -} -VALID_ACTIONS = {"mask", "preserve"} -VALID_STATUSES = {"draft", "candidate", "approved", "active", "disabled", "retired"} -VALID_ENVIRONMENTS = {"test", "staging", "prod"} -VALID_SECTIONS = {"narrative", "structured", "table", "header", "footer"} - - -def load_rules_config(path: Path) -> dict[str, Any]: - with path.open("r", encoding="utf-8") as handle: - data = yaml.safe_load(handle) or {} - if not isinstance(data, dict): - raise ValueError("Le fichier doit contenir un mapping YAML en racine.") - return data - - -def _is_non_empty_string(value: Any) -> bool: - return isinstance(value, str) and bool(value.strip()) - - -def generate_rule_variants(rule: dict[str, Any], limit: int = 12) -> list[str]: - rule_type = rule.get("type") - match = rule.get("match") or {} - normalization = rule.get("normalization") or {} - variants: list[str] = [] - - if rule_type in {"exact_term", "preserve_phrase"}: - exact_value = str(match.get("exact_value", "")).strip() - return [exact_value] if exact_value else [] - - if rule_type == "normalized_identifier": - canonical = str(match.get("canonical_value", "")).strip() - prefixes = normalization.get("accepted_prefixes") or [] - separators = normalization.get("prefix_value_separators") or [" "] - if normalization.get("allow_bare_value", False) and canonical: - variants.append(canonical) - for prefix in prefixes: - for separator in separators: - variants.append(f"{prefix}{separator}{canonical}") - if normalization.get("multiline", False): - variants.append(f"{prefix}\n{canonical}") - return _dedupe_keep_order(variants)[:limit] - - if rule_type == "contextual_identifier": - canonical = str(match.get("canonical_value", "")).strip() - prefixes = match.get("context_prefixes") or [] - separators = match.get("context_separators") or [": ", ":"] - for prefix in prefixes: - for separator in separators: - variants.append(f"{prefix}{separator}{canonical}") - if (rule.get("normalization") or {}).get("multiline", False): - variants.append(f"{prefix}\n{canonical}") - variants.append(f"{prefix} :\n{canonical}") - return _dedupe_keep_order(variants)[:limit] - - return [] - - -def _dedupe_keep_order(values: list[str]) -> list[str]: - seen: set[str] = set() - output: list[str] = [] - for value in values: - if value in seen: - continue - seen.add(value) - output.append(value) - return output - - -def validate_rules_config(data: dict[str, Any]) -> list[str]: - errors: list[str] = [] - - version = data.get("version") - if not isinstance(version, int) or version < 1: - errors.append("`version` doit etre un entier >= 1.") - - rules = data.get("rules") - if not isinstance(rules, list): - errors.append("`rules` doit etre une liste.") - return errors - - seen_ids: set[str] = set() - for index, rule in enumerate(rules): - prefix = f"rules[{index}]" - if not isinstance(rule, dict): - errors.append(f"{prefix}: chaque regle doit etre un mapping.") - continue - - rule_id = rule.get("id") - if not _is_non_empty_string(rule_id): - errors.append(f"{prefix}: `id` est obligatoire.") - elif rule_id in seen_ids: - errors.append(f"{prefix}: `id` duplique `{rule_id}`.") - else: - seen_ids.add(rule_id) - - if not _is_non_empty_string(rule.get("label")): - errors.append(f"{prefix}: `label` est obligatoire.") - - rule_type = rule.get("type") - if rule_type not in VALID_TYPES: - errors.append(f"{prefix}: `type` invalide.") - - action = rule.get("action") - if action not in VALID_ACTIONS: - errors.append(f"{prefix}: `action` invalide.") - - status = rule.get("status") - if status not in VALID_STATUSES: - errors.append(f"{prefix}: `status` invalide.") - - if action == "mask" and not _is_non_empty_string(rule.get("placeholder")): - errors.append(f"{prefix}: `placeholder` est obligatoire pour une regle de masquage.") - - match = rule.get("match") - if not isinstance(match, dict): - errors.append(f"{prefix}: `match` doit etre un mapping.") - match = {} - - normalization = rule.get("normalization") or {} - if normalization and not isinstance(normalization, dict): - errors.append(f"{prefix}: `normalization` doit etre un mapping.") - normalization = {} - - scope = rule.get("scope") - if not isinstance(scope, dict): - errors.append(f"{prefix}: `scope` doit etre un mapping.") - scope = {} - - governance = rule.get("governance") - if not isinstance(governance, dict): - errors.append(f"{prefix}: `governance` doit etre un mapping.") - governance = {} - - document_families = scope.get("document_families") - if not isinstance(document_families, list) or not document_families: - errors.append(f"{prefix}: `scope.document_families` doit etre une liste non vide.") - - environments = scope.get("environments") - if not isinstance(environments, list) or not environments: - errors.append(f"{prefix}: `scope.environments` doit etre une liste non vide.") - else: - invalid_envs = [value for value in environments if value not in VALID_ENVIRONMENTS] - if invalid_envs: - errors.append(f"{prefix}: environnements invalides: {', '.join(invalid_envs)}.") - - sections = scope.get("sections") - if not isinstance(sections, list) or not sections: - errors.append(f"{prefix}: `scope.sections` doit etre une liste non vide.") - else: - invalid_sections = [value for value in sections if value not in VALID_SECTIONS] - if invalid_sections: - errors.append(f"{prefix}: sections invalides: {', '.join(invalid_sections)}.") - - if not _is_non_empty_string(governance.get("owner")): - errors.append(f"{prefix}: `governance.owner` est obligatoire.") - if not _is_non_empty_string(governance.get("justification")): - errors.append(f"{prefix}: `governance.justification` est obligatoire.") - if not _is_non_empty_string(governance.get("created_at")): - errors.append(f"{prefix}: `governance.created_at` est obligatoire.") - - tests = governance.get("tests") - if not isinstance(tests, dict): - errors.append(f"{prefix}: `governance.tests` doit etre un mapping.") - tests = {} - required_case_ids = tests.get("required_case_ids") - if not isinstance(required_case_ids, list) or not required_case_ids: - errors.append(f"{prefix}: `governance.tests.required_case_ids` doit etre une liste non vide.") - - if rule_type == "exact_term": - if not _is_non_empty_string(match.get("exact_value")): - errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `exact_term`.") - - if rule_type == "preserve_phrase": - if action != "preserve": - errors.append(f"{prefix}: `preserve_phrase` doit utiliser `action: preserve`.") - if not _is_non_empty_string(match.get("exact_value")): - errors.append(f"{prefix}: `match.exact_value` est obligatoire pour `preserve_phrase`.") - - if rule_type == "normalized_identifier": - if not _is_non_empty_string(match.get("canonical_value")): - errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `normalized_identifier`.") - prefixes = normalization.get("accepted_prefixes", []) - if prefixes and not isinstance(prefixes, list): - errors.append(f"{prefix}: `normalization.accepted_prefixes` doit etre une liste.") - separators = normalization.get("prefix_value_separators", []) - if separators and not isinstance(separators, list): - errors.append(f"{prefix}: `normalization.prefix_value_separators` doit etre une liste.") - - if rule_type == "contextual_identifier": - if not _is_non_empty_string(match.get("canonical_value")): - errors.append(f"{prefix}: `match.canonical_value` est obligatoire pour `contextual_identifier`.") - context_prefixes = match.get("context_prefixes") - if not isinstance(context_prefixes, list) or not context_prefixes: - errors.append(f"{prefix}: `match.context_prefixes` doit etre une liste non vide.") - - if status == "active" and governance.get("review_required_for_activation", False): - if not _is_non_empty_string(governance.get("approved_by")): - errors.append(f"{prefix}: `governance.approved_by` est obligatoire pour une regle active.") - - return errors +from admin_rules import generate_rule_variants, load_effective_admin_rules_dict, validate_rules_config def main() -> int: @@ -233,7 +28,7 @@ def main() -> int: args = parser.parse_args() config_path = Path(args.config) - data = load_rules_config(config_path) + data = load_effective_admin_rules_dict(config_path) errors = validate_rules_config(data) if errors: