Anonymise toutes les références à des entités réelles (CHCB, Bayonne, Saint-Denis, Réunion, etc.) dans le code source, les configurations YAML, les scripts/outils, et les tests unitaires. Conserve les tests synthétiques (cases) intentionnels. - profile key chcb_strict → chuxx_strict - CHCB → CHUXX, Bayonne → Chicago, Saint-Denis → Springfield, Réunion → Province Bêta, 64100/97400 → 12345, FINESS → 999999999, préfixe tél 05.59.44 → 0X.XX.XX - renomme tools/test_chcb_leak.py → tools/test_force_term_leak.py Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
320 lines
11 KiB
Python
320 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Simule l'effet d'une règle d'administration sur un texte ou sur le corpus synthétique.
|
|
|
|
Usage :
|
|
# Appliquer une règle à un texte libre
|
|
python tools/simulate_admin_rule.py --rule-id rule_chuxx_exact_mask \\
|
|
--text "Consulté au CHUXX le 12/06/2024."
|
|
|
|
# Appliquer à un fichier texte
|
|
python tools/simulate_admin_rule.py --rule-id rule_chuxx_exact_mask \\
|
|
--file path/to/document.txt
|
|
|
|
# Valider la règle sur ses required_case_ids (--corpus)
|
|
python tools/simulate_admin_rule.py --rule-id rule_chuxx_exact_mask --corpus
|
|
|
|
# Valider TOUTES les règles actives sur leurs corpus
|
|
python tools/simulate_admin_rule.py --all --corpus
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from admin_rules import ( # noqa: E402
|
|
compile_active_admin_rules,
|
|
load_effective_admin_rules_dict,
|
|
)
|
|
|
|
CASES_DIR = ROOT / "tests" / "synthetic_review" / "cases"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Application isolée d'une règle compilée sur un texte
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _apply_rule_to_text(rule: dict[str, Any], text: str) -> tuple[str, list[tuple[str, str]]]:
|
|
"""
|
|
Applique une règle compilée (format retourné par compile_active_admin_rules)
|
|
et retourne (texte_masqué, [(original, placeholder), ...]).
|
|
"""
|
|
placeholder = rule.get("placeholder", "[MASK]")
|
|
matches: list[tuple[str, str]] = []
|
|
|
|
rule_type = rule.get("type")
|
|
if rule_type == "exact_term":
|
|
raw = rule.get("match", {}).get("exact_value", "")
|
|
if not raw:
|
|
return text, []
|
|
norm = rule.get("normalization", {}) or {}
|
|
flags = re.IGNORECASE if norm.get("case_insensitive") else 0
|
|
wb = norm.get("whole_word", True)
|
|
boundary = r"\b" if wb else ""
|
|
pattern = re.compile(rf"{boundary}{re.escape(raw)}{boundary}", flags)
|
|
|
|
def _repl(m: re.Match) -> str:
|
|
matches.append((m.group(0), placeholder))
|
|
return placeholder
|
|
|
|
text = pattern.sub(_repl, text)
|
|
|
|
elif rule_type == "preserve_phrase":
|
|
pass # pas de masquage
|
|
|
|
elif rule_type in {"normalized_identifier", "contextual_identifier"}:
|
|
for pat in rule.get("patterns", []):
|
|
def _repl_pat(m: re.Match, _ph: str = placeholder) -> str:
|
|
captured = m.group(1) if m.lastindex else m.group(0)
|
|
full = m.group(0)
|
|
matches.append((captured, _ph))
|
|
return full.replace(captured, _ph, 1)
|
|
|
|
text = pat.sub(_repl_pat, text)
|
|
|
|
return text, matches
|
|
|
|
|
|
def _compile_single_rule(rule_raw: dict[str, Any]) -> dict[str, Any]:
|
|
"""Compile une seule règle brute dans un format applicable par _apply_rule_to_text."""
|
|
rule_type = rule_raw.get("type")
|
|
if rule_type in {"normalized_identifier", "contextual_identifier"}:
|
|
from admin_rules import _compile_identifier_rule # noqa: PLC0415
|
|
compiled = _compile_identifier_rule(rule_raw)
|
|
compiled["match"] = rule_raw.get("match", {})
|
|
compiled["normalization"] = rule_raw.get("normalization", {})
|
|
compiled["placeholder"] = rule_raw.get("placeholder", "[MASK]")
|
|
return compiled
|
|
return {
|
|
"id": rule_raw.get("id"),
|
|
"type": rule_type,
|
|
"action": rule_raw.get("action"),
|
|
"match": rule_raw.get("match", {}),
|
|
"normalization": rule_raw.get("normalization", {}),
|
|
"placeholder": rule_raw.get("placeholder", "[MASK]"),
|
|
"patterns": [],
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Chargement des règles
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _load_rule_by_id(rule_id: str) -> dict[str, Any] | None:
|
|
data = load_effective_admin_rules_dict()
|
|
for rule in data.get("rules", []):
|
|
if rule.get("id") == rule_id:
|
|
return rule
|
|
return None
|
|
|
|
|
|
def _load_all_active_rules() -> list[dict[str, Any]]:
|
|
data = load_effective_admin_rules_dict()
|
|
return [r for r in data.get("rules", []) if r.get("status") == "active"]
|
|
|
|
|
|
def _get_required_case_ids(rule: dict[str, Any]) -> list[str]:
|
|
return (rule.get("governance") or {}).get("tests", {}).get("required_case_ids") or []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mode simulation texte libre
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def simulate_text(rule: dict[str, Any], text: str, verbose: bool = True) -> bool:
|
|
"""Simule la règle sur text et affiche le résultat. Retourne True si au moins 1 match."""
|
|
compiled = _compile_single_rule(rule)
|
|
masked, hits = _apply_rule_to_text(compiled, text)
|
|
|
|
rule_id = rule.get("id", "?")
|
|
status = rule.get("status", "?")
|
|
print(f"\n{'='*60}")
|
|
print(f"Règle : {rule_id} [status={status}]")
|
|
print(f"Type : {rule.get('type')} / action={rule.get('action')}")
|
|
print(f"{'='*60}")
|
|
|
|
if hits:
|
|
print(f"\n{len(hits)} occurrence(s) masquée(s) :")
|
|
for original, ph in hits:
|
|
print(f" '{original}' → {ph}")
|
|
if verbose:
|
|
print("\nTexte masqué :")
|
|
print("-" * 40)
|
|
print(masked)
|
|
print("-" * 40)
|
|
else:
|
|
print("\nAucune occurrence masquée.")
|
|
|
|
return bool(hits)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mode validation corpus
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def validate_corpus(rule: dict[str, Any]) -> dict[str, Any]:
|
|
"""
|
|
Valide la règle sur ses required_case_ids.
|
|
|
|
Pour chaque cas, vérifie que :
|
|
- La règle produit au moins 1 hit (sauf pour preserve_phrase).
|
|
- Les must_contain de expectations.json ne sont pas détruits par la règle.
|
|
|
|
Note : on ne vérifie PAS les must_not_contain globaux — ceux-ci testent
|
|
l'ensemble du pipeline, pas une règle isolée.
|
|
|
|
Retourne un dict {case_id: {"ok": bool, "failures": [...]}}
|
|
"""
|
|
import json # noqa: PLC0415
|
|
|
|
required_ids = _get_required_case_ids(rule)
|
|
if not required_ids:
|
|
return {}
|
|
|
|
compiled = _compile_single_rule(rule)
|
|
rule_type = rule.get("type")
|
|
results: dict[str, Any] = {}
|
|
|
|
for case_id in required_ids:
|
|
case_dir = CASES_DIR / case_id
|
|
if not case_dir.exists():
|
|
results[case_id] = {"ok": False, "failures": [f"Répertoire introuvable : {case_dir}"]}
|
|
continue
|
|
|
|
test_file = case_dir / "test.txt"
|
|
if not test_file.exists():
|
|
results[case_id] = {"ok": False, "failures": ["test.txt absent"]}
|
|
continue
|
|
|
|
text = test_file.read_text(encoding="utf-8")
|
|
masked, hits = _apply_rule_to_text(compiled, text)
|
|
|
|
exp_file = case_dir / "expectations.json"
|
|
expectations = json.loads(exp_file.read_text(encoding="utf-8")) if exp_file.exists() else {}
|
|
must_contain = expectations.get("must_contain", [])
|
|
|
|
failures = []
|
|
|
|
# Pour exact_term / normalized_identifier / contextual_identifier :
|
|
# la règle doit masquer au moins 1 occurrence dans le document de test.
|
|
if rule_type != "preserve_phrase" and not hits:
|
|
target = (rule.get("match") or {}).get("exact_value") or (rule.get("match") or {}).get("canonical_value") or "?"
|
|
failures.append(f"0 occurrence de '{target}' masquée — la règle n'a aucun effet sur ce cas")
|
|
|
|
# La règle ne doit pas détruire les termes médicaux préservés.
|
|
for term in must_contain:
|
|
if term not in masked:
|
|
failures.append(f"sur-masquage : '{term}' disparu après simulation")
|
|
|
|
results[case_id] = {"ok": not failures, "failures": failures, "hits": len(hits)}
|
|
|
|
return results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rapport corpus
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def print_corpus_report(rule: dict[str, Any], results: dict[str, Any]) -> bool:
|
|
"""Affiche le rapport et retourne True si tout est OK."""
|
|
rule_id = rule.get("id", "?")
|
|
print(f"\n{'='*60}")
|
|
print(f"Validation corpus : {rule_id}")
|
|
print(f"{'='*60}")
|
|
|
|
if not results:
|
|
print("Aucun required_case_id défini dans governance.tests.")
|
|
return True
|
|
|
|
all_ok = True
|
|
for case_id, res in results.items():
|
|
status = "OK" if res["ok"] else "FAIL"
|
|
hits = res.get("hits", 0)
|
|
print(f" [{status}] {case_id} ({hits} occurrence(s) masquée(s))")
|
|
for failure in res.get("failures", []):
|
|
print(f" - {failure}")
|
|
all_ok = False
|
|
|
|
print()
|
|
if all_ok:
|
|
print("Validation OK — la règle couvre tous ses cas de test.")
|
|
else:
|
|
print("Validation ÉCHEC — corriger les écarts ci-dessus.")
|
|
|
|
return all_ok
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entrée CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Simuler / valider une règle d'administration",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=__doc__,
|
|
)
|
|
|
|
target = parser.add_mutually_exclusive_group(required=True)
|
|
target.add_argument("--rule-id", metavar="ID", help="Identifiant de la règle à simuler")
|
|
target.add_argument("--all", action="store_true", help="Traiter toutes les règles actives")
|
|
|
|
source = parser.add_mutually_exclusive_group()
|
|
source.add_argument("--text", metavar="TEXT", help="Texte libre à tester")
|
|
source.add_argument("--file", metavar="FILE", type=Path, help="Fichier texte à tester")
|
|
|
|
parser.add_argument(
|
|
"--corpus",
|
|
action="store_true",
|
|
help="Valider sur les required_case_ids définis dans la règle",
|
|
)
|
|
parser.add_argument("--quiet", action="store_true", help="Sortie minimale (code retour uniquement)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.corpus and args.text is None and args.file is None:
|
|
parser.error("Fournir --text, --file ou --corpus.")
|
|
|
|
rules: list[dict[str, Any]] = []
|
|
if args.all:
|
|
rules = _load_all_active_rules()
|
|
if not rules:
|
|
print("Aucune règle active trouvée.")
|
|
return 0
|
|
else:
|
|
rule = _load_rule_by_id(args.rule_id)
|
|
if rule is None:
|
|
print(f"Règle '{args.rule_id}' introuvable dans la configuration.", file=sys.stderr)
|
|
return 2
|
|
rules = [rule]
|
|
|
|
exit_code = 0
|
|
|
|
for rule in rules:
|
|
if args.corpus:
|
|
results = validate_corpus(rule)
|
|
ok = print_corpus_report(rule, results)
|
|
if not ok:
|
|
exit_code = 1
|
|
else:
|
|
if args.file:
|
|
text = args.file.read_text(encoding="utf-8")
|
|
else:
|
|
text = args.text or ""
|
|
found = simulate_text(rule, text, verbose=not args.quiet)
|
|
if not found:
|
|
exit_code = 1 if args.quiet else 0
|
|
|
|
return exit_code
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|