170 lines
5.2 KiB
Python
170 lines
5.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Exécute le corpus synthétique de revue humaine et produit les diffs.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import difflib
|
|
import json
|
|
import shutil
|
|
import sys
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from anonymizer_core_refactored_onnx import ( # noqa: E402
|
|
anonymise_document_regex,
|
|
load_dictionaries,
|
|
selective_rescan,
|
|
)
|
|
from evaluation.leak_scanner import LeakScanner # noqa: E402
|
|
|
|
|
|
CORPUS_DIR = ROOT / "tests" / "synthetic_review"
|
|
CASES_DIR = CORPUS_DIR / "cases"
|
|
ACTUAL_DIR = CORPUS_DIR / "actual"
|
|
SCANNER = LeakScanner()
|
|
|
|
|
|
def normalize_text(text: str) -> str:
|
|
text = text.replace("\r\n", "\n").replace("\r", "\n")
|
|
return "\n".join(line.rstrip() for line in text.strip().splitlines()) + "\n"
|
|
|
|
|
|
def load_expectations(case_dir: Path) -> dict:
|
|
expectations_path = case_dir / "expectations.json"
|
|
if not expectations_path.exists():
|
|
return {}
|
|
return json.loads(expectations_path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def build_leak_scan_seed(audit: list[dict]) -> list[dict]:
|
|
"""Évite les faux positifs sur les valeurs trop courtes ou ambiguës."""
|
|
seed = []
|
|
for item in audit:
|
|
original = str(item.get("original", "")).strip()
|
|
compact = original.replace(" ", "")
|
|
if len(compact) < 4:
|
|
continue
|
|
if compact.isdigit() and len(compact) < 6:
|
|
continue
|
|
seed.append(
|
|
{
|
|
"kind": item["kind"],
|
|
"original": original,
|
|
}
|
|
)
|
|
return seed
|
|
|
|
|
|
def run_case(case_dir: Path) -> dict:
|
|
cfg_path = case_dir / "config_overlay.yml"
|
|
cfg = load_dictionaries(cfg_path if cfg_path.exists() else None)
|
|
|
|
source_text = (case_dir / "test.txt").read_text(encoding="utf-8")
|
|
expected_text = normalize_text((case_dir / "expected.txt").read_text(encoding="utf-8"))
|
|
expectations = load_expectations(case_dir)
|
|
|
|
anon = anonymise_document_regex([source_text], [[]], cfg)
|
|
actual_text = normalize_text(selective_rescan(anon.text_out, cfg))
|
|
audit = [
|
|
{
|
|
"kind": hit.kind,
|
|
"original": hit.original,
|
|
"replacement": hit.placeholder,
|
|
}
|
|
for hit in anon.audit
|
|
]
|
|
summary = {
|
|
"kinds_present": sorted(set(item["kind"] for item in audit)),
|
|
"kind_counts": dict(sorted(Counter(item["kind"] for item in audit).items())),
|
|
"audit_len": len(audit),
|
|
"leaks": SCANNER.scan_text(actual_text, build_leak_scan_seed(audit)),
|
|
}
|
|
|
|
case_actual_dir = ACTUAL_DIR / case_dir.name
|
|
if case_actual_dir.exists():
|
|
shutil.rmtree(case_actual_dir)
|
|
case_actual_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
(case_actual_dir / "actual.txt").write_text(actual_text, encoding="utf-8")
|
|
(case_actual_dir / "actual.audit.json").write_text(
|
|
json.dumps(audit, ensure_ascii=False, indent=2) + "\n",
|
|
encoding="utf-8",
|
|
)
|
|
(case_actual_dir / "actual.summary.json").write_text(
|
|
json.dumps(summary, ensure_ascii=False, indent=2) + "\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
diff_lines = list(
|
|
difflib.unified_diff(
|
|
expected_text.splitlines(keepends=True),
|
|
actual_text.splitlines(keepends=True),
|
|
fromfile=f"{case_dir.name}/expected.txt",
|
|
tofile=f"{case_dir.name}/actual.txt",
|
|
)
|
|
)
|
|
(case_actual_dir / "diff.txt").write_text("".join(diff_lines), encoding="utf-8")
|
|
|
|
failures = []
|
|
if actual_text != expected_text:
|
|
failures.append("text_diff")
|
|
|
|
if summary["leaks"]:
|
|
failures.append("leak_detected")
|
|
|
|
required_kinds = expectations.get("required_kinds", [])
|
|
missing_kinds = sorted(kind for kind in required_kinds if kind not in summary["kinds_present"])
|
|
if missing_kinds:
|
|
failures.append(f"missing_kinds:{','.join(missing_kinds)}")
|
|
|
|
for required in expectations.get("must_contain", []):
|
|
if required not in actual_text:
|
|
failures.append(f"missing_text:{required}")
|
|
|
|
for forbidden in expectations.get("must_not_contain", []):
|
|
if forbidden in actual_text:
|
|
failures.append(f"forbidden_text:{forbidden}")
|
|
|
|
return {
|
|
"case": case_dir.name,
|
|
"failures": failures,
|
|
"output_dir": str(case_actual_dir),
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Exécuter le corpus synthétique de revue humaine")
|
|
parser.add_argument(
|
|
"--strict",
|
|
action="store_true",
|
|
help="Retourne un code non nul si un cas diffère de l'attendu.",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
ACTUAL_DIR.mkdir(parents=True, exist_ok=True)
|
|
case_dirs = sorted(path for path in CASES_DIR.iterdir() if path.is_dir())
|
|
results = [run_case(case_dir) for case_dir in case_dirs]
|
|
|
|
has_failures = False
|
|
for result in results:
|
|
if result["failures"]:
|
|
has_failures = True
|
|
print(f"[FAIL] {result['case']}: {', '.join(result['failures'])}")
|
|
else:
|
|
print(f"[OK] {result['case']}")
|
|
print(f" -> {result['output_dir']}")
|
|
|
|
if args.strict and has_failures:
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|