#!/usr/bin/env python3 """ Exécute le corpus synthétique de revue humaine et produit les diffs. """ from __future__ import annotations import argparse import difflib import json import shutil import sys from collections import Counter from pathlib import Path ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from anonymizer_core_refactored_onnx import ( # noqa: E402 anonymise_document_regex, load_dictionaries, selective_rescan, ) from evaluation.leak_scanner import LeakScanner # noqa: E402 CORPUS_DIR = ROOT / "tests" / "synthetic_review" CASES_DIR = CORPUS_DIR / "cases" ACTUAL_DIR = CORPUS_DIR / "actual" SCANNER = LeakScanner() def normalize_text(text: str) -> str: text = text.replace("\r\n", "\n").replace("\r", "\n") return "\n".join(line.rstrip() for line in text.strip().splitlines()) + "\n" def load_expectations(case_dir: Path) -> dict: expectations_path = case_dir / "expectations.json" if not expectations_path.exists(): return {} return json.loads(expectations_path.read_text(encoding="utf-8")) def build_leak_scan_seed(audit: list[dict]) -> list[dict]: """Évite les faux positifs sur les valeurs trop courtes ou ambiguës.""" seed = [] for item in audit: original = str(item.get("original", "")).strip() compact = original.replace(" ", "") if len(compact) < 4: continue if compact.isdigit() and len(compact) < 6: continue seed.append( { "kind": item["kind"], "original": original, } ) return seed def run_case(case_dir: Path) -> dict: cfg_path = case_dir / "config_overlay.yml" cfg = load_dictionaries(cfg_path if cfg_path.exists() else None) source_text = (case_dir / "test.txt").read_text(encoding="utf-8") expected_text = normalize_text((case_dir / "expected.txt").read_text(encoding="utf-8")) expectations = load_expectations(case_dir) anon = anonymise_document_regex([source_text], [[]], cfg) actual_text = normalize_text(selective_rescan(anon.text_out, cfg)) audit = [ { "kind": hit.kind, "original": hit.original, "replacement": hit.placeholder, } for hit in anon.audit ] summary = { "kinds_present": sorted(set(item["kind"] for item in audit)), "kind_counts": dict(sorted(Counter(item["kind"] for item in audit).items())), "audit_len": len(audit), "leaks": SCANNER.scan_text(actual_text, build_leak_scan_seed(audit)), } case_actual_dir = ACTUAL_DIR / case_dir.name if case_actual_dir.exists(): shutil.rmtree(case_actual_dir) case_actual_dir.mkdir(parents=True, exist_ok=True) (case_actual_dir / "actual.txt").write_text(actual_text, encoding="utf-8") (case_actual_dir / "actual.audit.json").write_text( json.dumps(audit, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) (case_actual_dir / "actual.summary.json").write_text( json.dumps(summary, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) diff_lines = list( difflib.unified_diff( expected_text.splitlines(keepends=True), actual_text.splitlines(keepends=True), fromfile=f"{case_dir.name}/expected.txt", tofile=f"{case_dir.name}/actual.txt", ) ) (case_actual_dir / "diff.txt").write_text("".join(diff_lines), encoding="utf-8") failures = [] if actual_text != expected_text: failures.append("text_diff") if summary["leaks"]: failures.append("leak_detected") required_kinds = expectations.get("required_kinds", []) missing_kinds = sorted(kind for kind in required_kinds if kind not in summary["kinds_present"]) if missing_kinds: failures.append(f"missing_kinds:{','.join(missing_kinds)}") for required in expectations.get("must_contain", []): if required not in actual_text: failures.append(f"missing_text:{required}") for forbidden in expectations.get("must_not_contain", []): if forbidden in actual_text: failures.append(f"forbidden_text:{forbidden}") return { "case": case_dir.name, "failures": failures, "output_dir": str(case_actual_dir), } def main() -> int: parser = argparse.ArgumentParser(description="Exécuter le corpus synthétique de revue humaine") parser.add_argument( "--strict", action="store_true", help="Retourne un code non nul si un cas diffère de l'attendu.", ) args = parser.parse_args() ACTUAL_DIR.mkdir(parents=True, exist_ok=True) case_dirs = sorted(path for path in CASES_DIR.iterdir() if path.is_dir()) results = [run_case(case_dir) for case_dir in case_dirs] has_failures = False for result in results: if result["failures"]: has_failures = True print(f"[FAIL] {result['case']}: {', '.join(result['failures'])}") else: print(f"[OK] {result['case']}") print(f" -> {result['output_dir']}") if args.strict and has_failures: return 1 return 0 if __name__ == "__main__": raise SystemExit(main())