Files
anonymisation/tools/run_synthetic_review_corpus.py

170 lines
5.2 KiB
Python

#!/usr/bin/env python3
"""
Exécute le corpus synthétique de revue humaine et produit les diffs.
"""
from __future__ import annotations
import argparse
import difflib
import json
import shutil
import sys
from collections import Counter
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from anonymizer_core_refactored_onnx import ( # noqa: E402
anonymise_document_regex,
load_dictionaries,
selective_rescan,
)
from evaluation.leak_scanner import LeakScanner # noqa: E402
CORPUS_DIR = ROOT / "tests" / "synthetic_review"
CASES_DIR = CORPUS_DIR / "cases"
ACTUAL_DIR = CORPUS_DIR / "actual"
SCANNER = LeakScanner()
def normalize_text(text: str) -> str:
text = text.replace("\r\n", "\n").replace("\r", "\n")
return "\n".join(line.rstrip() for line in text.strip().splitlines()) + "\n"
def load_expectations(case_dir: Path) -> dict:
expectations_path = case_dir / "expectations.json"
if not expectations_path.exists():
return {}
return json.loads(expectations_path.read_text(encoding="utf-8"))
def build_leak_scan_seed(audit: list[dict]) -> list[dict]:
"""Évite les faux positifs sur les valeurs trop courtes ou ambiguës."""
seed = []
for item in audit:
original = str(item.get("original", "")).strip()
compact = original.replace(" ", "")
if len(compact) < 4:
continue
if compact.isdigit() and len(compact) < 6:
continue
seed.append(
{
"kind": item["kind"],
"original": original,
}
)
return seed
def run_case(case_dir: Path) -> dict:
cfg_path = case_dir / "config_overlay.yml"
cfg = load_dictionaries(cfg_path if cfg_path.exists() else None)
source_text = (case_dir / "test.txt").read_text(encoding="utf-8")
expected_text = normalize_text((case_dir / "expected.txt").read_text(encoding="utf-8"))
expectations = load_expectations(case_dir)
anon = anonymise_document_regex([source_text], [[]], cfg)
actual_text = normalize_text(selective_rescan(anon.text_out, cfg))
audit = [
{
"kind": hit.kind,
"original": hit.original,
"replacement": hit.placeholder,
}
for hit in anon.audit
]
summary = {
"kinds_present": sorted(set(item["kind"] for item in audit)),
"kind_counts": dict(sorted(Counter(item["kind"] for item in audit).items())),
"audit_len": len(audit),
"leaks": SCANNER.scan_text(actual_text, build_leak_scan_seed(audit)),
}
case_actual_dir = ACTUAL_DIR / case_dir.name
if case_actual_dir.exists():
shutil.rmtree(case_actual_dir)
case_actual_dir.mkdir(parents=True, exist_ok=True)
(case_actual_dir / "actual.txt").write_text(actual_text, encoding="utf-8")
(case_actual_dir / "actual.audit.json").write_text(
json.dumps(audit, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
(case_actual_dir / "actual.summary.json").write_text(
json.dumps(summary, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
diff_lines = list(
difflib.unified_diff(
expected_text.splitlines(keepends=True),
actual_text.splitlines(keepends=True),
fromfile=f"{case_dir.name}/expected.txt",
tofile=f"{case_dir.name}/actual.txt",
)
)
(case_actual_dir / "diff.txt").write_text("".join(diff_lines), encoding="utf-8")
failures = []
if actual_text != expected_text:
failures.append("text_diff")
if summary["leaks"]:
failures.append("leak_detected")
required_kinds = expectations.get("required_kinds", [])
missing_kinds = sorted(kind for kind in required_kinds if kind not in summary["kinds_present"])
if missing_kinds:
failures.append(f"missing_kinds:{','.join(missing_kinds)}")
for required in expectations.get("must_contain", []):
if required not in actual_text:
failures.append(f"missing_text:{required}")
for forbidden in expectations.get("must_not_contain", []):
if forbidden in actual_text:
failures.append(f"forbidden_text:{forbidden}")
return {
"case": case_dir.name,
"failures": failures,
"output_dir": str(case_actual_dir),
}
def main() -> int:
parser = argparse.ArgumentParser(description="Exécuter le corpus synthétique de revue humaine")
parser.add_argument(
"--strict",
action="store_true",
help="Retourne un code non nul si un cas diffère de l'attendu.",
)
args = parser.parse_args()
ACTUAL_DIR.mkdir(parents=True, exist_ok=True)
case_dirs = sorted(path for path in CASES_DIR.iterdir() if path.is_dir())
results = [run_case(case_dir) for case_dir in case_dirs]
has_failures = False
for result in results:
if result["failures"]:
has_failures = True
print(f"[FAIL] {result['case']}: {', '.join(result['failures'])}")
else:
print(f"[OK] {result['case']}")
print(f" -> {result['output_dir']}")
if args.strict and has_failures:
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())