Le pipeline produit un JSON riche pendant l'exécution (ratios
checkbox, OCR raw, flags _parse_error/_truncated_loop/_crop_recodage,
_source, _elapsed_s…). Utile en audit, mais pollue quand on veut
exposer le résultat à un consommateur aval (Excel, dashboard, API).
pipeline/schema.py :
- SCHEMA_VERSION "2.0"
- clean_dossier(raw) : retourne une copie propre avec structure stable
(en-tête → codage → GHM/GHS → décisions) et validation ATIH en
format compact (summary + cross_checks + flags par champ).
- CLEAN_FIELDS_RECUEIL / CLEAN_FIELDS_CONCERTATION_{1,2} / CLEAN_FIELDS_PREUVES
documentent les champs stables par type de page.
- CLI : `python -m pipeline.schema` → nettoie `output/v2/*.json` vers
`output/v2_clean/`.
Séparation claire : `output/v2/` reste le JSON raw (audit), `output/v2_clean/`
est la sortie propre et stable pour livrables.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
186 lines
6.8 KiB
Python
186 lines
6.8 KiB
Python
"""Schema de sortie stable du pipeline + fonction de nettoyage.
|
|
|
|
Le pipeline produit un JSON riche pendant l'exécution (avec des champs de debug :
|
|
ratios checkbox, OCR raw, flags _parse_error, _truncated_loop, _crop_recodage,
|
|
_checkbox_debug, _source, etc). Cette information est utile pour auditer un
|
|
dossier mais pollue la structure quand on veut exposer le résultat à un
|
|
consommateur aval (Excel, dashboard, échange inter-équipes).
|
|
|
|
Ce module expose :
|
|
- `clean_dossier(raw)` : retourne une version propre, lisible et stable,
|
|
sans champs de debug. Garde les flags de validation ATIH qui ont une valeur
|
|
métier (codes valides, cohérence GHM↔GHS).
|
|
- `SCHEMA_VERSION` : version du format (incrémentée à chaque breaking
|
|
change de structure).
|
|
- `CLEAN_FIELDS_RECUEIL` : liste des champs finaux de la page recueil
|
|
(utile pour Excel, dashboard, docs).
|
|
|
|
Principe : le JSON raw reste dans `output/v2/<nom>.json` (audit complet), le
|
|
JSON clean est produit séparément sur demande via `clean_dossier()`.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from copy import deepcopy
|
|
from typing import Any
|
|
|
|
SCHEMA_VERSION = "2.0"
|
|
|
|
# Champs retenus sur la page recueil pour la sortie propre. L'ordre est
|
|
# celui de l'affichage logique (en-tête → séjour → codage → GHM/GHS → décisions).
|
|
CLEAN_FIELDS_RECUEIL = [
|
|
"etablissement", "finess", "date_debut_controle",
|
|
"n_ogc", "n_champ", "dates_sejour",
|
|
"sejour_etab", "sejour_reco", "rum_etab",
|
|
"codage_etab", "codage_reco",
|
|
"actes_etab", "actes_reco",
|
|
"ghm_etab", "ghs_etab", "ghm_reco", "ghs_reco",
|
|
"recodage_impactant", "ghs_injustifie",
|
|
"accord_desaccord", "praticien_conseil",
|
|
]
|
|
|
|
CLEAN_FIELDS_CONCERTATION_2 = [
|
|
"ghs_initial", "ghs_avant_concertation", "ghs_final",
|
|
"decision", "date_concertation",
|
|
"praticien_controleur", "medecin_dim",
|
|
]
|
|
|
|
CLEAN_FIELDS_CONCERTATION_1 = [
|
|
"date_concertation", "argumentaire",
|
|
]
|
|
|
|
CLEAN_FIELDS_PREUVES = [
|
|
"date", "praticien_controleur", "medecin_dim", "pieces",
|
|
]
|
|
|
|
|
|
# Champs de debug à retirer systématiquement du clean
|
|
DEBUG_FIELDS = {
|
|
"_checkbox_debug",
|
|
"_crop_recodage",
|
|
"_parse_error",
|
|
"_raw",
|
|
"_truncated_loop",
|
|
"_source",
|
|
"_elapsed_s",
|
|
"_n_codes_raw",
|
|
"_n_codes_kept",
|
|
}
|
|
|
|
|
|
def _pick(d: dict, keys: list[str]) -> dict:
|
|
"""Retourne un dict ordonné avec uniquement les clés présentes."""
|
|
out = {}
|
|
for k in keys:
|
|
if k in d:
|
|
out[k] = d[k]
|
|
return out
|
|
|
|
|
|
def _clean_validation(validation: dict | None) -> dict | None:
|
|
"""Garde la validation ATIH mais en format compact : juste les flags utiles."""
|
|
if not isinstance(validation, dict):
|
|
return None
|
|
summary = validation.get("summary") or {}
|
|
cc = validation.get("cross_checks") or {}
|
|
# On conserve juste l'essentiel : par champ, le flag valid (True/False/None)
|
|
# et éventuellement la suggestion de correction OCR.
|
|
def _compact_code(entry):
|
|
if not isinstance(entry, dict) or "valid" not in entry:
|
|
return None
|
|
out = {"valid": entry.get("valid")}
|
|
if entry.get("suggestion"):
|
|
out["suggestion"] = entry["suggestion"]
|
|
if entry.get("libelle_ref"):
|
|
out["libelle_ref"] = entry["libelle_ref"]
|
|
return out
|
|
|
|
result = {
|
|
"summary": summary,
|
|
"codage_etab": {
|
|
"dp": _compact_code(validation.get("codage_etab", {}).get("dp")),
|
|
"dr": _compact_code(validation.get("codage_etab", {}).get("dr")),
|
|
"das": [_compact_code(d) for d in validation.get("codage_etab", {}).get("das", []) or []],
|
|
},
|
|
"codage_reco": {
|
|
"dp": _compact_code(validation.get("codage_reco", {}).get("dp")),
|
|
"dr": _compact_code(validation.get("codage_reco", {}).get("dr")),
|
|
"das": [_compact_code(d) for d in validation.get("codage_reco", {}).get("das", []) or []],
|
|
},
|
|
"ghm_etab": _compact_code(validation.get("ghm_etab")),
|
|
"ghs_etab": _compact_code(validation.get("ghs_etab")),
|
|
"ghm_reco": _compact_code(validation.get("ghm_reco")),
|
|
"ghs_reco": _compact_code(validation.get("ghs_reco")),
|
|
"cross_checks": {
|
|
"etab_ghm_ghs_coherent": cc.get("etab", {}).get("coherent"),
|
|
"reco_ghm_ghs_coherent": cc.get("reco", {}).get("coherent"),
|
|
},
|
|
}
|
|
return result
|
|
|
|
|
|
def _clean_recueil(page: dict) -> dict:
|
|
cleaned = _pick(page, CLEAN_FIELDS_RECUEIL)
|
|
# Sous-champs codage : nettoyer aussi les codes invalides
|
|
v = _clean_validation(page.get("_validation"))
|
|
if v:
|
|
cleaned["_validation"] = v
|
|
return cleaned
|
|
|
|
|
|
def _clean_simple(page: dict, fields: list[str]) -> dict:
|
|
cleaned = _pick(page, fields)
|
|
v = page.get("_validation")
|
|
if isinstance(v, dict):
|
|
cleaned["_validation"] = v # déjà compact pour ces pages
|
|
return cleaned
|
|
|
|
|
|
def clean_dossier(raw: dict) -> dict:
|
|
"""Retourne une copie nettoyée d'un résultat de pipeline.
|
|
|
|
Strippe les champs de debug internes, garde la validation ATIH compacte
|
|
et une structure stable.
|
|
"""
|
|
extraction = raw.get("extraction") or {}
|
|
clean_extraction: dict[str, Any] = {}
|
|
|
|
if "recueil" in extraction and isinstance(extraction["recueil"], dict):
|
|
clean_extraction["recueil"] = _clean_recueil(extraction["recueil"])
|
|
if "concertation_2" in extraction and isinstance(extraction["concertation_2"], dict):
|
|
clean_extraction["concertation_2"] = _clean_simple(
|
|
extraction["concertation_2"], CLEAN_FIELDS_CONCERTATION_2)
|
|
if "concertation_1" in extraction and isinstance(extraction["concertation_1"], dict):
|
|
clean_extraction["concertation_1"] = _clean_simple(
|
|
extraction["concertation_1"], CLEAN_FIELDS_CONCERTATION_1)
|
|
if "preuves" in extraction and isinstance(extraction["preuves"], dict):
|
|
clean_extraction["preuves"] = _clean_simple(
|
|
extraction["preuves"], CLEAN_FIELDS_PREUVES)
|
|
|
|
return {
|
|
"fichier": raw.get("fichier"),
|
|
"pdf_hash": raw.get("pdf_hash"),
|
|
"schema_version": SCHEMA_VERSION,
|
|
"extraction": clean_extraction,
|
|
"_meta": raw.get("_meta", {}),
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Utilitaire : nettoyer un fichier en place, ou produire une version clean
|
|
import json, sys, glob
|
|
from pathlib import Path
|
|
|
|
if len(sys.argv) > 1:
|
|
paths = [Path(p) for p in sys.argv[1:]]
|
|
else:
|
|
paths = [Path(p) for p in sorted(glob.glob("output/v2/OGC *.json"))]
|
|
|
|
out_dir = Path("output/v2_clean")
|
|
out_dir.mkdir(exist_ok=True)
|
|
for p in paths:
|
|
raw = json.loads(p.read_text(encoding="utf-8"))
|
|
clean = clean_dossier(raw)
|
|
(out_dir / p.name).write_text(
|
|
json.dumps(clean, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(f"{len(paths)} fichiers nettoyés → {out_dir}/")
|