From a157973f283b6790c64284e9e1acf35993e6fd1d Mon Sep 17 00:00:00 2001 From: Domi31tls Date: Tue, 28 Apr 2026 12:02:17 +0200 Subject: [PATCH] feat(admin_rules): CLI simulate_admin_rule + fix email avant force_terms MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - fix(detect): EMAIL masqué avant _apply_overrides pour éviter que les force_terms (ex: CHCB) ne cassent l'adresse — mh.lafitte@chcb.fr → [EMAIL] - fix(corpus): expected 007 mis à jour ([EMAIL] à la place de mh.[NOM]@[MASK].fr) - feat(tools): tools/simulate_admin_rule.py — CLI de simulation et validation isolée d'une règle admin (--text, --file, --corpus, --all) - fix(admin_rules): required_case_ids corrigés dans admin_rules.default.yml (noms des répertoires du corpus synthétique mis à jour) Co-Authored-By: Claude Opus 4.7 (1M context) --- anonymizer_core_refactored_onnx.py | 8 +- config/admin_rules.default.yml | 6 +- .../007_lettre_sortie_complete/expected.txt | 2 +- tools/simulate_admin_rule.py | 319 ++++++++++++++++++ 4 files changed, 327 insertions(+), 8 deletions(-) create mode 100644 tools/simulate_admin_rule.py diff --git a/anonymizer_core_refactored_onnx.py b/anonymizer_core_refactored_onnx.py index 5b86abf..71c8ec3 100644 --- a/anonymizer_core_refactored_onnx.py +++ b/anonymizer_core_refactored_onnx.py @@ -1285,15 +1285,15 @@ def _mask_admin_label(line: str, audit: List[PiiHit], page_idx: int) -> str: def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str: - # user overrides & force-masks d'abord - line = _apply_overrides(line, audit, page_idx, cfg) - - # EMAIL + # EMAIL avant les overrides : les force_terms (ex: CHCB) casseraient sinon l'adresse def _repl_email(m: re.Match) -> str: audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"])) return PLACEHOLDERS["EMAIL"] line = RE_EMAIL.sub(_repl_email, line) + # user overrides & force-masks + line = _apply_overrides(line, audit, page_idx, cfg) + # URLs (toutes — peuvent identifier établissements, personnes, services) def _repl_url(m: re.Match) -> str: audit.append(PiiHit(page_idx, "URL", m.group(0), PLACEHOLDERS["MASK"])) diff --git a/config/admin_rules.default.yml b/config/admin_rules.default.yml index ce5bca0..b6fcdd9 100644 --- a/config/admin_rules.default.yml +++ b/config/admin_rules.default.yml @@ -43,7 +43,7 @@ rules: approved_by: responsable_qualite tests: required_case_ids: - - 005_force_mask_default_term + - 009_multi_etablissements - 001_crh_hospitalisation_complete - id: rule_identifier_1234567 @@ -87,7 +87,7 @@ rules: approved_by: null tests: required_case_ids: - - 003_multiline_venue_number + - 003_consultation_complete - 001_crh_hospitalisation_complete - id: rule_ipp_context_abc12345 @@ -158,6 +158,6 @@ rules: approved_by: responsable_qualite tests: required_case_ids: - - 006_whitelist_phrases_preserved + - 006_trackare_soignants - 001_crh_hospitalisation_complete - 002_imagerie_complete diff --git a/tests/synthetic_review/cases/007_lettre_sortie_complete/expected.txt b/tests/synthetic_review/cases/007_lettre_sortie_complete/expected.txt index ac3ad1b..338ee93 100644 --- a/tests/synthetic_review/cases/007_lettre_sortie_complete/expected.txt +++ b/tests/synthetic_review/cases/007_lettre_sortie_complete/expected.txt @@ -39,4 +39,4 @@ Confraternellement, Dr [NOM] Cardiologue, RPPS : [RPPS] Tel secrétariat : [TEL] -mh.[NOM]@[MASK].fr +[EMAIL] diff --git a/tools/simulate_admin_rule.py b/tools/simulate_admin_rule.py new file mode 100644 index 0000000..fc4a27b --- /dev/null +++ b/tools/simulate_admin_rule.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python3 +""" +Simule l'effet d'une règle d'administration sur un texte ou sur le corpus synthétique. + +Usage : + # Appliquer une règle à un texte libre + python tools/simulate_admin_rule.py --rule-id rule_chcb_exact_mask \\ + --text "Consulté au CHCB le 12/06/2024." + + # Appliquer à un fichier texte + python tools/simulate_admin_rule.py --rule-id rule_chcb_exact_mask \\ + --file path/to/document.txt + + # Valider la règle sur ses required_case_ids (--corpus) + python tools/simulate_admin_rule.py --rule-id rule_chcb_exact_mask --corpus + + # Valider TOUTES les règles actives sur leurs corpus + python tools/simulate_admin_rule.py --all --corpus +""" +from __future__ import annotations + +import argparse +import re +import sys +from pathlib import Path +from typing import Any + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from admin_rules import ( # noqa: E402 + compile_active_admin_rules, + load_effective_admin_rules_dict, +) + +CASES_DIR = ROOT / "tests" / "synthetic_review" / "cases" + + +# --------------------------------------------------------------------------- +# Application isolée d'une règle compilée sur un texte +# --------------------------------------------------------------------------- + +def _apply_rule_to_text(rule: dict[str, Any], text: str) -> tuple[str, list[tuple[str, str]]]: + """ + Applique une règle compilée (format retourné par compile_active_admin_rules) + et retourne (texte_masqué, [(original, placeholder), ...]). + """ + placeholder = rule.get("placeholder", "[MASK]") + matches: list[tuple[str, str]] = [] + + rule_type = rule.get("type") + if rule_type == "exact_term": + raw = rule.get("match", {}).get("exact_value", "") + if not raw: + return text, [] + norm = rule.get("normalization", {}) or {} + flags = re.IGNORECASE if norm.get("case_insensitive") else 0 + wb = norm.get("whole_word", True) + boundary = r"\b" if wb else "" + pattern = re.compile(rf"{boundary}{re.escape(raw)}{boundary}", flags) + + def _repl(m: re.Match) -> str: + matches.append((m.group(0), placeholder)) + return placeholder + + text = pattern.sub(_repl, text) + + elif rule_type == "preserve_phrase": + pass # pas de masquage + + elif rule_type in {"normalized_identifier", "contextual_identifier"}: + for pat in rule.get("patterns", []): + def _repl_pat(m: re.Match, _ph: str = placeholder) -> str: + captured = m.group(1) if m.lastindex else m.group(0) + full = m.group(0) + matches.append((captured, _ph)) + return full.replace(captured, _ph, 1) + + text = pat.sub(_repl_pat, text) + + return text, matches + + +def _compile_single_rule(rule_raw: dict[str, Any]) -> dict[str, Any]: + """Compile une seule règle brute dans un format applicable par _apply_rule_to_text.""" + rule_type = rule_raw.get("type") + if rule_type in {"normalized_identifier", "contextual_identifier"}: + from admin_rules import _compile_identifier_rule # noqa: PLC0415 + compiled = _compile_identifier_rule(rule_raw) + compiled["match"] = rule_raw.get("match", {}) + compiled["normalization"] = rule_raw.get("normalization", {}) + compiled["placeholder"] = rule_raw.get("placeholder", "[MASK]") + return compiled + return { + "id": rule_raw.get("id"), + "type": rule_type, + "action": rule_raw.get("action"), + "match": rule_raw.get("match", {}), + "normalization": rule_raw.get("normalization", {}), + "placeholder": rule_raw.get("placeholder", "[MASK]"), + "patterns": [], + } + + +# --------------------------------------------------------------------------- +# Chargement des règles +# --------------------------------------------------------------------------- + +def _load_rule_by_id(rule_id: str) -> dict[str, Any] | None: + data = load_effective_admin_rules_dict() + for rule in data.get("rules", []): + if rule.get("id") == rule_id: + return rule + return None + + +def _load_all_active_rules() -> list[dict[str, Any]]: + data = load_effective_admin_rules_dict() + return [r for r in data.get("rules", []) if r.get("status") == "active"] + + +def _get_required_case_ids(rule: dict[str, Any]) -> list[str]: + return (rule.get("governance") or {}).get("tests", {}).get("required_case_ids") or [] + + +# --------------------------------------------------------------------------- +# Mode simulation texte libre +# --------------------------------------------------------------------------- + +def simulate_text(rule: dict[str, Any], text: str, verbose: bool = True) -> bool: + """Simule la règle sur text et affiche le résultat. Retourne True si au moins 1 match.""" + compiled = _compile_single_rule(rule) + masked, hits = _apply_rule_to_text(compiled, text) + + rule_id = rule.get("id", "?") + status = rule.get("status", "?") + print(f"\n{'='*60}") + print(f"Règle : {rule_id} [status={status}]") + print(f"Type : {rule.get('type')} / action={rule.get('action')}") + print(f"{'='*60}") + + if hits: + print(f"\n{len(hits)} occurrence(s) masquée(s) :") + for original, ph in hits: + print(f" '{original}' → {ph}") + if verbose: + print("\nTexte masqué :") + print("-" * 40) + print(masked) + print("-" * 40) + else: + print("\nAucune occurrence masquée.") + + return bool(hits) + + +# --------------------------------------------------------------------------- +# Mode validation corpus +# --------------------------------------------------------------------------- + +def validate_corpus(rule: dict[str, Any]) -> dict[str, Any]: + """ + Valide la règle sur ses required_case_ids. + + Pour chaque cas, vérifie que : + - La règle produit au moins 1 hit (sauf pour preserve_phrase). + - Les must_contain de expectations.json ne sont pas détruits par la règle. + + Note : on ne vérifie PAS les must_not_contain globaux — ceux-ci testent + l'ensemble du pipeline, pas une règle isolée. + + Retourne un dict {case_id: {"ok": bool, "failures": [...]}} + """ + import json # noqa: PLC0415 + + required_ids = _get_required_case_ids(rule) + if not required_ids: + return {} + + compiled = _compile_single_rule(rule) + rule_type = rule.get("type") + results: dict[str, Any] = {} + + for case_id in required_ids: + case_dir = CASES_DIR / case_id + if not case_dir.exists(): + results[case_id] = {"ok": False, "failures": [f"Répertoire introuvable : {case_dir}"]} + continue + + test_file = case_dir / "test.txt" + if not test_file.exists(): + results[case_id] = {"ok": False, "failures": ["test.txt absent"]} + continue + + text = test_file.read_text(encoding="utf-8") + masked, hits = _apply_rule_to_text(compiled, text) + + exp_file = case_dir / "expectations.json" + expectations = json.loads(exp_file.read_text(encoding="utf-8")) if exp_file.exists() else {} + must_contain = expectations.get("must_contain", []) + + failures = [] + + # Pour exact_term / normalized_identifier / contextual_identifier : + # la règle doit masquer au moins 1 occurrence dans le document de test. + if rule_type != "preserve_phrase" and not hits: + target = (rule.get("match") or {}).get("exact_value") or (rule.get("match") or {}).get("canonical_value") or "?" + failures.append(f"0 occurrence de '{target}' masquée — la règle n'a aucun effet sur ce cas") + + # La règle ne doit pas détruire les termes médicaux préservés. + for term in must_contain: + if term not in masked: + failures.append(f"sur-masquage : '{term}' disparu après simulation") + + results[case_id] = {"ok": not failures, "failures": failures, "hits": len(hits)} + + return results + + +# --------------------------------------------------------------------------- +# Rapport corpus +# --------------------------------------------------------------------------- + +def print_corpus_report(rule: dict[str, Any], results: dict[str, Any]) -> bool: + """Affiche le rapport et retourne True si tout est OK.""" + rule_id = rule.get("id", "?") + print(f"\n{'='*60}") + print(f"Validation corpus : {rule_id}") + print(f"{'='*60}") + + if not results: + print("Aucun required_case_id défini dans governance.tests.") + return True + + all_ok = True + for case_id, res in results.items(): + status = "OK" if res["ok"] else "FAIL" + hits = res.get("hits", 0) + print(f" [{status}] {case_id} ({hits} occurrence(s) masquée(s))") + for failure in res.get("failures", []): + print(f" - {failure}") + all_ok = False + + print() + if all_ok: + print("Validation OK — la règle couvre tous ses cas de test.") + else: + print("Validation ÉCHEC — corriger les écarts ci-dessus.") + + return all_ok + + +# --------------------------------------------------------------------------- +# Entrée CLI +# --------------------------------------------------------------------------- + +def main() -> int: + parser = argparse.ArgumentParser( + description="Simuler / valider une règle d'administration", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + + target = parser.add_mutually_exclusive_group(required=True) + target.add_argument("--rule-id", metavar="ID", help="Identifiant de la règle à simuler") + target.add_argument("--all", action="store_true", help="Traiter toutes les règles actives") + + source = parser.add_mutually_exclusive_group() + source.add_argument("--text", metavar="TEXT", help="Texte libre à tester") + source.add_argument("--file", metavar="FILE", type=Path, help="Fichier texte à tester") + + parser.add_argument( + "--corpus", + action="store_true", + help="Valider sur les required_case_ids définis dans la règle", + ) + parser.add_argument("--quiet", action="store_true", help="Sortie minimale (code retour uniquement)") + + args = parser.parse_args() + + if not args.corpus and args.text is None and args.file is None: + parser.error("Fournir --text, --file ou --corpus.") + + rules: list[dict[str, Any]] = [] + if args.all: + rules = _load_all_active_rules() + if not rules: + print("Aucune règle active trouvée.") + return 0 + else: + rule = _load_rule_by_id(args.rule_id) + if rule is None: + print(f"Règle '{args.rule_id}' introuvable dans la configuration.", file=sys.stderr) + return 2 + rules = [rule] + + exit_code = 0 + + for rule in rules: + if args.corpus: + results = validate_corpus(rule) + ok = print_corpus_report(rule, results) + if not ok: + exit_code = 1 + else: + if args.file: + text = args.file.read_text(encoding="utf-8") + else: + text = args.text or "" + found = simulate_text(rule, text, verbose=not args.quiet) + if not found: + exit_code = 1 if args.quiet else 0 + + return exit_code + + +if __name__ == "__main__": + raise SystemExit(main())