feat(admin_rules): CLI simulate_admin_rule + fix email avant force_terms
- fix(detect): EMAIL masqué avant _apply_overrides pour éviter que les force_terms (ex: CHCB) ne cassent l'adresse — mh.lafitte@chcb.fr → [EMAIL] - fix(corpus): expected 007 mis à jour ([EMAIL] à la place de mh.[NOM]@[MASK].fr) - feat(tools): tools/simulate_admin_rule.py — CLI de simulation et validation isolée d'une règle admin (--text, --file, --corpus, --all) - fix(admin_rules): required_case_ids corrigés dans admin_rules.default.yml (noms des répertoires du corpus synthétique mis à jour) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1285,15 +1285,15 @@ def _mask_admin_label(line: str, audit: List[PiiHit], page_idx: int) -> str:
|
||||
|
||||
|
||||
def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
|
||||
# user overrides & force-masks d'abord
|
||||
line = _apply_overrides(line, audit, page_idx, cfg)
|
||||
|
||||
# EMAIL
|
||||
# EMAIL avant les overrides : les force_terms (ex: CHCB) casseraient sinon l'adresse
|
||||
def _repl_email(m: re.Match) -> str:
|
||||
audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"]))
|
||||
return PLACEHOLDERS["EMAIL"]
|
||||
line = RE_EMAIL.sub(_repl_email, line)
|
||||
|
||||
# user overrides & force-masks
|
||||
line = _apply_overrides(line, audit, page_idx, cfg)
|
||||
|
||||
# URLs (toutes — peuvent identifier établissements, personnes, services)
|
||||
def _repl_url(m: re.Match) -> str:
|
||||
audit.append(PiiHit(page_idx, "URL", m.group(0), PLACEHOLDERS["MASK"]))
|
||||
|
||||
@@ -43,7 +43,7 @@ rules:
|
||||
approved_by: responsable_qualite
|
||||
tests:
|
||||
required_case_ids:
|
||||
- 005_force_mask_default_term
|
||||
- 009_multi_etablissements
|
||||
- 001_crh_hospitalisation_complete
|
||||
|
||||
- id: rule_identifier_1234567
|
||||
@@ -87,7 +87,7 @@ rules:
|
||||
approved_by: null
|
||||
tests:
|
||||
required_case_ids:
|
||||
- 003_multiline_venue_number
|
||||
- 003_consultation_complete
|
||||
- 001_crh_hospitalisation_complete
|
||||
|
||||
- id: rule_ipp_context_abc12345
|
||||
@@ -158,6 +158,6 @@ rules:
|
||||
approved_by: responsable_qualite
|
||||
tests:
|
||||
required_case_ids:
|
||||
- 006_whitelist_phrases_preserved
|
||||
- 006_trackare_soignants
|
||||
- 001_crh_hospitalisation_complete
|
||||
- 002_imagerie_complete
|
||||
|
||||
@@ -39,4 +39,4 @@ Confraternellement,
|
||||
Dr [NOM]
|
||||
Cardiologue, RPPS : [RPPS]
|
||||
Tel secrétariat : [TEL]
|
||||
mh.[NOM]@[MASK].fr
|
||||
[EMAIL]
|
||||
|
||||
319
tools/simulate_admin_rule.py
Normal file
319
tools/simulate_admin_rule.py
Normal file
@@ -0,0 +1,319 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Simule l'effet d'une règle d'administration sur un texte ou sur le corpus synthétique.
|
||||
|
||||
Usage :
|
||||
# Appliquer une règle à un texte libre
|
||||
python tools/simulate_admin_rule.py --rule-id rule_chcb_exact_mask \\
|
||||
--text "Consulté au CHCB le 12/06/2024."
|
||||
|
||||
# Appliquer à un fichier texte
|
||||
python tools/simulate_admin_rule.py --rule-id rule_chcb_exact_mask \\
|
||||
--file path/to/document.txt
|
||||
|
||||
# Valider la règle sur ses required_case_ids (--corpus)
|
||||
python tools/simulate_admin_rule.py --rule-id rule_chcb_exact_mask --corpus
|
||||
|
||||
# Valider TOUTES les règles actives sur leurs corpus
|
||||
python tools/simulate_admin_rule.py --all --corpus
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
from admin_rules import ( # noqa: E402
|
||||
compile_active_admin_rules,
|
||||
load_effective_admin_rules_dict,
|
||||
)
|
||||
|
||||
CASES_DIR = ROOT / "tests" / "synthetic_review" / "cases"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Application isolée d'une règle compilée sur un texte
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _apply_rule_to_text(rule: dict[str, Any], text: str) -> tuple[str, list[tuple[str, str]]]:
|
||||
"""
|
||||
Applique une règle compilée (format retourné par compile_active_admin_rules)
|
||||
et retourne (texte_masqué, [(original, placeholder), ...]).
|
||||
"""
|
||||
placeholder = rule.get("placeholder", "[MASK]")
|
||||
matches: list[tuple[str, str]] = []
|
||||
|
||||
rule_type = rule.get("type")
|
||||
if rule_type == "exact_term":
|
||||
raw = rule.get("match", {}).get("exact_value", "")
|
||||
if not raw:
|
||||
return text, []
|
||||
norm = rule.get("normalization", {}) or {}
|
||||
flags = re.IGNORECASE if norm.get("case_insensitive") else 0
|
||||
wb = norm.get("whole_word", True)
|
||||
boundary = r"\b" if wb else ""
|
||||
pattern = re.compile(rf"{boundary}{re.escape(raw)}{boundary}", flags)
|
||||
|
||||
def _repl(m: re.Match) -> str:
|
||||
matches.append((m.group(0), placeholder))
|
||||
return placeholder
|
||||
|
||||
text = pattern.sub(_repl, text)
|
||||
|
||||
elif rule_type == "preserve_phrase":
|
||||
pass # pas de masquage
|
||||
|
||||
elif rule_type in {"normalized_identifier", "contextual_identifier"}:
|
||||
for pat in rule.get("patterns", []):
|
||||
def _repl_pat(m: re.Match, _ph: str = placeholder) -> str:
|
||||
captured = m.group(1) if m.lastindex else m.group(0)
|
||||
full = m.group(0)
|
||||
matches.append((captured, _ph))
|
||||
return full.replace(captured, _ph, 1)
|
||||
|
||||
text = pat.sub(_repl_pat, text)
|
||||
|
||||
return text, matches
|
||||
|
||||
|
||||
def _compile_single_rule(rule_raw: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Compile une seule règle brute dans un format applicable par _apply_rule_to_text."""
|
||||
rule_type = rule_raw.get("type")
|
||||
if rule_type in {"normalized_identifier", "contextual_identifier"}:
|
||||
from admin_rules import _compile_identifier_rule # noqa: PLC0415
|
||||
compiled = _compile_identifier_rule(rule_raw)
|
||||
compiled["match"] = rule_raw.get("match", {})
|
||||
compiled["normalization"] = rule_raw.get("normalization", {})
|
||||
compiled["placeholder"] = rule_raw.get("placeholder", "[MASK]")
|
||||
return compiled
|
||||
return {
|
||||
"id": rule_raw.get("id"),
|
||||
"type": rule_type,
|
||||
"action": rule_raw.get("action"),
|
||||
"match": rule_raw.get("match", {}),
|
||||
"normalization": rule_raw.get("normalization", {}),
|
||||
"placeholder": rule_raw.get("placeholder", "[MASK]"),
|
||||
"patterns": [],
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Chargement des règles
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _load_rule_by_id(rule_id: str) -> dict[str, Any] | None:
|
||||
data = load_effective_admin_rules_dict()
|
||||
for rule in data.get("rules", []):
|
||||
if rule.get("id") == rule_id:
|
||||
return rule
|
||||
return None
|
||||
|
||||
|
||||
def _load_all_active_rules() -> list[dict[str, Any]]:
|
||||
data = load_effective_admin_rules_dict()
|
||||
return [r for r in data.get("rules", []) if r.get("status") == "active"]
|
||||
|
||||
|
||||
def _get_required_case_ids(rule: dict[str, Any]) -> list[str]:
|
||||
return (rule.get("governance") or {}).get("tests", {}).get("required_case_ids") or []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Mode simulation texte libre
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def simulate_text(rule: dict[str, Any], text: str, verbose: bool = True) -> bool:
|
||||
"""Simule la règle sur text et affiche le résultat. Retourne True si au moins 1 match."""
|
||||
compiled = _compile_single_rule(rule)
|
||||
masked, hits = _apply_rule_to_text(compiled, text)
|
||||
|
||||
rule_id = rule.get("id", "?")
|
||||
status = rule.get("status", "?")
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Règle : {rule_id} [status={status}]")
|
||||
print(f"Type : {rule.get('type')} / action={rule.get('action')}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
if hits:
|
||||
print(f"\n{len(hits)} occurrence(s) masquée(s) :")
|
||||
for original, ph in hits:
|
||||
print(f" '{original}' → {ph}")
|
||||
if verbose:
|
||||
print("\nTexte masqué :")
|
||||
print("-" * 40)
|
||||
print(masked)
|
||||
print("-" * 40)
|
||||
else:
|
||||
print("\nAucune occurrence masquée.")
|
||||
|
||||
return bool(hits)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Mode validation corpus
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def validate_corpus(rule: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
Valide la règle sur ses required_case_ids.
|
||||
|
||||
Pour chaque cas, vérifie que :
|
||||
- La règle produit au moins 1 hit (sauf pour preserve_phrase).
|
||||
- Les must_contain de expectations.json ne sont pas détruits par la règle.
|
||||
|
||||
Note : on ne vérifie PAS les must_not_contain globaux — ceux-ci testent
|
||||
l'ensemble du pipeline, pas une règle isolée.
|
||||
|
||||
Retourne un dict {case_id: {"ok": bool, "failures": [...]}}
|
||||
"""
|
||||
import json # noqa: PLC0415
|
||||
|
||||
required_ids = _get_required_case_ids(rule)
|
||||
if not required_ids:
|
||||
return {}
|
||||
|
||||
compiled = _compile_single_rule(rule)
|
||||
rule_type = rule.get("type")
|
||||
results: dict[str, Any] = {}
|
||||
|
||||
for case_id in required_ids:
|
||||
case_dir = CASES_DIR / case_id
|
||||
if not case_dir.exists():
|
||||
results[case_id] = {"ok": False, "failures": [f"Répertoire introuvable : {case_dir}"]}
|
||||
continue
|
||||
|
||||
test_file = case_dir / "test.txt"
|
||||
if not test_file.exists():
|
||||
results[case_id] = {"ok": False, "failures": ["test.txt absent"]}
|
||||
continue
|
||||
|
||||
text = test_file.read_text(encoding="utf-8")
|
||||
masked, hits = _apply_rule_to_text(compiled, text)
|
||||
|
||||
exp_file = case_dir / "expectations.json"
|
||||
expectations = json.loads(exp_file.read_text(encoding="utf-8")) if exp_file.exists() else {}
|
||||
must_contain = expectations.get("must_contain", [])
|
||||
|
||||
failures = []
|
||||
|
||||
# Pour exact_term / normalized_identifier / contextual_identifier :
|
||||
# la règle doit masquer au moins 1 occurrence dans le document de test.
|
||||
if rule_type != "preserve_phrase" and not hits:
|
||||
target = (rule.get("match") or {}).get("exact_value") or (rule.get("match") or {}).get("canonical_value") or "?"
|
||||
failures.append(f"0 occurrence de '{target}' masquée — la règle n'a aucun effet sur ce cas")
|
||||
|
||||
# La règle ne doit pas détruire les termes médicaux préservés.
|
||||
for term in must_contain:
|
||||
if term not in masked:
|
||||
failures.append(f"sur-masquage : '{term}' disparu après simulation")
|
||||
|
||||
results[case_id] = {"ok": not failures, "failures": failures, "hits": len(hits)}
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Rapport corpus
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def print_corpus_report(rule: dict[str, Any], results: dict[str, Any]) -> bool:
|
||||
"""Affiche le rapport et retourne True si tout est OK."""
|
||||
rule_id = rule.get("id", "?")
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Validation corpus : {rule_id}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
if not results:
|
||||
print("Aucun required_case_id défini dans governance.tests.")
|
||||
return True
|
||||
|
||||
all_ok = True
|
||||
for case_id, res in results.items():
|
||||
status = "OK" if res["ok"] else "FAIL"
|
||||
hits = res.get("hits", 0)
|
||||
print(f" [{status}] {case_id} ({hits} occurrence(s) masquée(s))")
|
||||
for failure in res.get("failures", []):
|
||||
print(f" - {failure}")
|
||||
all_ok = False
|
||||
|
||||
print()
|
||||
if all_ok:
|
||||
print("Validation OK — la règle couvre tous ses cas de test.")
|
||||
else:
|
||||
print("Validation ÉCHEC — corriger les écarts ci-dessus.")
|
||||
|
||||
return all_ok
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entrée CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Simuler / valider une règle d'administration",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=__doc__,
|
||||
)
|
||||
|
||||
target = parser.add_mutually_exclusive_group(required=True)
|
||||
target.add_argument("--rule-id", metavar="ID", help="Identifiant de la règle à simuler")
|
||||
target.add_argument("--all", action="store_true", help="Traiter toutes les règles actives")
|
||||
|
||||
source = parser.add_mutually_exclusive_group()
|
||||
source.add_argument("--text", metavar="TEXT", help="Texte libre à tester")
|
||||
source.add_argument("--file", metavar="FILE", type=Path, help="Fichier texte à tester")
|
||||
|
||||
parser.add_argument(
|
||||
"--corpus",
|
||||
action="store_true",
|
||||
help="Valider sur les required_case_ids définis dans la règle",
|
||||
)
|
||||
parser.add_argument("--quiet", action="store_true", help="Sortie minimale (code retour uniquement)")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.corpus and args.text is None and args.file is None:
|
||||
parser.error("Fournir --text, --file ou --corpus.")
|
||||
|
||||
rules: list[dict[str, Any]] = []
|
||||
if args.all:
|
||||
rules = _load_all_active_rules()
|
||||
if not rules:
|
||||
print("Aucune règle active trouvée.")
|
||||
return 0
|
||||
else:
|
||||
rule = _load_rule_by_id(args.rule_id)
|
||||
if rule is None:
|
||||
print(f"Règle '{args.rule_id}' introuvable dans la configuration.", file=sys.stderr)
|
||||
return 2
|
||||
rules = [rule]
|
||||
|
||||
exit_code = 0
|
||||
|
||||
for rule in rules:
|
||||
if args.corpus:
|
||||
results = validate_corpus(rule)
|
||||
ok = print_corpus_report(rule, results)
|
||||
if not ok:
|
||||
exit_code = 1
|
||||
else:
|
||||
if args.file:
|
||||
text = args.file.read_text(encoding="utf-8")
|
||||
else:
|
||||
text = args.text or ""
|
||||
found = simulate_text(rule, text, verbose=not args.quiet)
|
||||
if not found:
|
||||
exit_code = 1 if args.quiet else 0
|
||||
|
||||
return exit_code
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user