"""Annote les JSONs V2 existants avec la validation ATIH. Utile pour ajouter la validation sans relancer l'extraction complète. Produit aussi un rapport agrégé en markdown. """ import json from collections import defaultdict from pathlib import Path from pipeline.validation import annotate OUT_DIR = Path("output/v2") REPORT = Path("validation_report.md") def annotate_all() -> list[dict]: """Annote chaque JSON et écrit le résultat en place (avec _validation).""" results = [] for p in sorted(OUT_DIR.glob("OGC *.json")): data = json.loads(p.read_text(encoding="utf-8")) annotated = annotate(data) p.write_text(json.dumps(annotated, ensure_ascii=False, indent=2), encoding="utf-8") results.append(annotated) rec_v = annotated.get("extraction", {}).get("recueil", {}).get("_validation", {}) s = rec_v.get("summary", {}) cc = rec_v.get("cross_checks", {}) print(f" {data['fichier']:8s} — valid={s.get('valid',0):2d} invalid={s.get('invalid',0):2d} " f"empty={s.get('empty',0):2d} incoherent={s.get('ghm_ghs_incoherents',0)} " f"etab={cc.get('etab',{}).get('coherent','?')} reco={cc.get('reco',{}).get('coherent','?')}") return results def build_report(results: list[dict]): """Agrégation par champ : taux de validité, suggestions les plus fréquentes.""" per_field = defaultdict(lambda: {"total": 0, "valid": 0, "invalid": 0, "empty": 0, "suggestions": []}) incoherences = [] for d in results: name = d["fichier"] rec_v = d.get("extraction", {}).get("recueil", {}).get("_validation", {}) if not rec_v: continue # Codes unitaires for key in ["ghm_etab", "ghs_etab", "ghm_reco", "ghs_reco"]: entry = rec_v.get(key, {}) st = per_field[key] st["total"] += 1 if entry.get("valid") is True: st["valid"] += 1 elif entry.get("valid") is False: st["invalid"] += 1 if "suggestion" in entry: st["suggestions"].append((name, entry["code"], entry["suggestion"])) else: st["empty"] += 1 # Codage etab / reco : dp + dr + das for section in ["codage_etab", "codage_reco"]: sec = rec_v.get(section, {}) for sub in ["dp", "dr"]: entry = sec.get(sub, {}) st = per_field[f"{section}.{sub}"] st["total"] += 1 if entry.get("valid") is True: st["valid"] += 1 elif entry.get("valid") is False: st["invalid"] += 1 if "suggestion" in entry: st["suggestions"].append((name, entry["code"], entry["suggestion"])) else: st["empty"] += 1 for das in sec.get("das", []) or []: st = per_field[f"{section}.das"] st["total"] += 1 if das.get("valid") is True: st["valid"] += 1 elif das.get("valid") is False: st["invalid"] += 1 if "suggestion" in das: st["suggestions"].append((name, das["code"], das["suggestion"])) else: st["empty"] += 1 # Cohérence GHM ↔ GHS for side in ["etab", "reco"]: cc = rec_v.get("cross_checks", {}).get(side, {}) if cc.get("checked") and not cc.get("coherent"): incoherences.append({ "dossier": name, "side": side, "ghs_extrait": cc.get("ghs_extrait"), "ghs_possibles": cc.get("ghs_possibles"), }) # Markdown report lines = ["# Rapport de validation ATIH — V2 (18 dossiers)\n"] lines.append("## Couverture et validité par champ\n") lines.append("| Champ | Total | Valid | Invalid | Vide | Validité codes renseignés |") lines.append("|---|---:|---:|---:|---:|---:|") for f, st in per_field.items(): renseignes = st["valid"] + st["invalid"] ratio = (100 * st["valid"] / renseignes) if renseignes else 0 lines.append(f"| `{f}` | {st['total']} | {st['valid']} | {st['invalid']} | {st['empty']} | {ratio:.0f}% |") # Suggestions OCR lines.append("\n## Corrections OCR suggérées (Levenshtein ≤ 1)") lines.append("\nCodes extraits invalides mais ressemblant à un code ATIH existant :\n") lines.append("| Dossier | Champ | Code extrait | Suggestion |") lines.append("|---|---|---|---|") sugg_count = 0 for field, st in per_field.items(): for name, code, sug in st["suggestions"]: lines.append(f"| {name} | `{field}` | `{code}` | **`{sug}`** |") sugg_count += 1 if sugg_count == 0: lines.append("| — | — | — | Aucune suggestion (pas de correction Levenshtein ≤ 1) |") # Incohérences GHM ↔ GHS lines.append("\n## Incohérences GHM ↔ GHS détectées\n") if incoherences: lines.append("| Dossier | Côté | GHS extrait | GHS possibles pour le GHM |") lines.append("|---|---|---|---|") for inc in incoherences: lines.append(f"| {inc['dossier']} | {inc['side']} | `{inc['ghs_extrait']}` | {inc['ghs_possibles']} |") else: lines.append("✓ Aucune incohérence détectée sur les GHM/GHS extraits.") lines.append(f"\n## Synthèse\n") total_codes = sum(st["valid"] + st["invalid"] for st in per_field.values()) total_valid = sum(st["valid"] for st in per_field.values()) lines.append(f"- **{total_valid}/{total_codes} codes valides** ({100*total_valid/total_codes:.1f}%)") lines.append(f"- **{sugg_count} suggestions de correction OCR** trouvées automatiquement") lines.append(f"- **{len(incoherences)} incohérences GHM↔GHS** sur les paires extraites") REPORT.write_text("\n".join(lines), encoding="utf-8") print(f"\nRapport → {REPORT}") if __name__ == "__main__": print("Annotation en place des JSONs V2 + calcul validation ATIH...\n") results = annotate_all() build_report(results)