Files
ScanOGC_extract/generate_report.py
2026-04-24 11:04:31 +02:00

684 lines
30 KiB
Python

"""
Génération du bilan d'extraction OGC — MISTRAL
Usage : python3 generate_report.py
"""
import json
import re
import sys
from datetime import datetime
from pathlib import Path
from reportlab.lib import colors
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_RIGHT
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
from reportlab.lib.units import cm
from reportlab.platypus import (
HRFlowable, PageBreak, Paragraph, SimpleDocTemplate, Spacer, Table,
TableStyle,
)
# ─── Config ───────────────────────────────────────────────────────────────────
BASE = Path(__file__).parent
OUTPUT = BASE / "output"
MODEL = "ministral-3:8b-cloud"
LABEL = "MISTRAL"
ACC = colors.HexColor("#6c3483")
JSON_PATH = OUTPUT / "extraction_ogc_raw.json"
CORRECTION_PATH = OUTPUT / "extraction_ogc_raw_Correction.json"
TIMING_PATH = OUTPUT / "timing_stats.json"
REPORT_PATH = OUTPUT / "bilan_extraction_mistral_ogc.pdf"
# ─── Styles ───────────────────────────────────────────────────────────────────
def make_styles(acc):
base = getSampleStyleSheet()
return {
"title": ParagraphStyle("title", parent=base["Title"],
fontSize=22, textColor=colors.white, alignment=TA_LEFT),
"subtitle": ParagraphStyle("subtitle", parent=base["Normal"],
fontSize=10, textColor=colors.HexColor("#aaaaaa"), alignment=TA_LEFT),
"section": ParagraphStyle("section", parent=base["Heading2"],
fontSize=13, textColor=acc, spaceBefore=16, spaceAfter=6),
"body": ParagraphStyle("body", parent=base["Normal"], fontSize=9, leading=14),
"small": ParagraphStyle("small", parent=base["Normal"], fontSize=8,
textColor=colors.HexColor("#444444")),
"right": ParagraphStyle("right", parent=base["Normal"], fontSize=7,
textColor=colors.HexColor("#888888"), alignment=TA_RIGHT),
"kpi_num": ParagraphStyle("kpi_num", parent=base["Normal"], fontSize=36,
fontName="Helvetica-Bold", alignment=TA_CENTER),
"kpi_lbl": ParagraphStyle("kpi_lbl", parent=base["Normal"], fontSize=8,
textColor=colors.HexColor("#777777"), alignment=TA_CENTER),
"warn": ParagraphStyle("warn", parent=base["Normal"], fontSize=8,
textColor=colors.HexColor("#c0392b")),
"footnote": ParagraphStyle("footnote", parent=base["Normal"], fontSize=7,
textColor=colors.HexColor("#888888")),
"center": ParagraphStyle("center", parent=base["Normal"], fontSize=9, alignment=TA_CENTER),
"bold": ParagraphStyle("bold", parent=base["Normal"], fontSize=9,
fontName="Helvetica-Bold"),
"th": ParagraphStyle("th", parent=base["Normal"], fontSize=8,
textColor=colors.white, fontName="Helvetica-Bold"),
}
# ─── Utilitaires ──────────────────────────────────────────────────────────────
def _fmt_s(s):
if s is None:
return ""
s = int(s)
h, r = divmod(s, 3600)
m, sec = divmod(r, 60)
if h:
return f"{h}h{m:02d}m{sec:02d}s"
if m:
return f"{m}m{sec:02d}s"
return f"{sec}s"
def _prec_color(p: float):
if p >= 90:
return colors.HexColor("#27ae60")
if p >= 75:
return colors.HexColor("#e67e22")
return colors.HexColor("#e74c3c")
def _gravite_color(g: str):
return {
"Critique": colors.HexColor("#e74c3c"),
"Haute": colors.HexColor("#e67e22"),
"Moyenne": colors.HexColor("#f1c40f"),
"Faible": colors.HexColor("#27ae60"),
}.get(g, colors.black)
_TS = TableStyle
def _base_table_style(acc):
return [
("BACKGROUND", (0, 0), (-1, 0), acc),
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("FONTSIZE", (0, 0), (-1, -1), 8),
("ROWBACKGROUNDS",(0, 1), (-1, -1), [colors.HexColor("#f8f9fa"), colors.white]),
("GRID", (0, 0), (-1, -1), 0.3, colors.HexColor("#cccccc")),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
("TOPPADDING", (0, 0), (-1, -1), 4),
("BOTTOMPADDING", (0, 0), (-1, -1), 4),
]
# ─── Comparaison orig vs correction ───────────────────────────────────────────
def _flatten(d, prefix=""):
items = {}
if isinstance(d, dict):
for k, v in d.items():
key = f"{prefix}.{k}" if prefix else k
if isinstance(v, (dict, list)):
items.update(_flatten(v, key))
else:
items[key] = str(v).strip()
elif isinstance(d, list):
for i, v in enumerate(d):
key = f"{prefix}[{i}]"
if isinstance(v, (dict, list)):
items.update(_flatten(v, key))
else:
items[key] = str(v).strip()
return items
def _normalize_keys(flat: dict) -> dict:
"""Normalise les anciens noms de clés pour compatibilité avec les fichiers de correction antérieurs.
rang → niveau (renommage effectué en avril 2026).
"""
return {k.replace(".rang", ".niveau"): v for k, v in flat.items()}
def _get_cat(key: str, ptype: str = "") -> str:
k = key.lower()
if ptype == "ELEMENTS_PREUVE":
if any(x in k for x in ("medecin", "signataire", "date")):
return "Signataires / Dates"
return "Éléments de preuve"
if ptype in ("FICHE_ADMIN_2_2", "FICHE_ADMIN_1_2"):
if any(x in k for x in ("nom_medecin", "date_conc", "medecin")):
return "Signataires / Dates"
return "Concertation (2/2)"
if any(x in k for x in ("das_etab", "das_reco")):
return "DAS"
if any(x in k for x in ("sejour_etab", "sejour_reco")):
return "Données séjour"
if any(x in k for x in ("dp_etab", "dr_etab", "dp_reco", "dr_reco")):
return "DP / DR"
if any(x in k for x in ("rum_etab", "rum_reco")):
return "Données RUM"
if any(x in k for x in ("actes_etab", "actes_reco")):
return "Actes"
if any(x in k for x in ("ghm_", "ghs_")):
return "GHM / GHS"
if any(x in k for x in ("accord_desaccord", "se_coche", "atu", "ffm", "fsd")):
return "Accord / SE"
if any(x in k for x in ("date_debut", "date_fin", "nom_praticien")):
return "Signataires / Dates"
return "Métadonnées"
def compare_extractions(orig_list, corr_list):
orig_map = {r["fichier"]: r for r in orig_list}
corr_map = {r["fichier"]: r for r in corr_list}
total_g = correct_g = 0
per_dossier = []
per_cat = {}
per_type = {}
ep_counters = {k: {"occ": 0, "dossiers": set()} for k in [
"dr_confondu_das", "annee_mal_lue", "se_coche_halluc", "maintien_X",
"provenance_halluc", "acte_dans_das", "das_manquant", "das_code_wrong",
"json_non_parsable",
]}
for fichier in sorted(orig_map):
if fichier not in corr_map:
continue
o = orig_map[fichier]
c = corr_map[fichier]
o_pages = {(p["page"], p.get("type", "")): p for p in o.get("pages_traitees", [])}
c_pages = {(p["page"], p.get("type", "")): p for p in c.get("pages_traitees", [])}
structural_error = "raw_response" in json.dumps(o)
dos_total = dos_correct = 0
for page_key in sorted(set(o_pages) & set(c_pages)):
op = o_pages[page_key]
cp = c_pages[page_key]
ptype = op.get("type", "UNKNOWN")
od = op.get("data", {})
cd = cp.get("data", {})
if not isinstance(od, dict) or not isinstance(cd, dict):
continue
if "raw_response" in od or "raw_response" in cd:
ep_counters["json_non_parsable"]["occ"] += 1
ep_counters["json_non_parsable"]["dossiers"].add(fichier)
continue
o_flat = _normalize_keys(_flatten(od))
c_flat = _normalize_keys(_flatten(cd))
all_keys = set(o_flat) | set(c_flat)
for k in all_keys:
ov = o_flat.get(k, "")
cv = c_flat.get(k, "")
cat = _get_cat(k, ptype)
per_cat.setdefault(cat, {"total": 0, "correct": 0})
per_type.setdefault(ptype, {"total": 0, "correct": 0})
per_cat[cat]["total"] += 1
per_type[ptype]["total"] += 1
dos_total += 1
if ov == cv:
per_cat[cat]["correct"] += 1
per_type[ptype]["correct"] += 1
dos_correct += 1
if ptype == "FICHE_RECUEIL":
dr = (od.get("dr_etab") or {}).get("code", "")
cdr = (cd.get("dr_etab") or {}).get("code", "")
if dr and not cdr:
ep_counters["dr_confondu_das"]["occ"] += 1
ep_counters["dr_confondu_das"]["dossiers"].add(fichier)
prov = str((od.get("sejour_etab") or {}).get("provenance", "")).strip()
cprov = str((cd.get("sejour_etab") or {}).get("provenance", "")).strip()
if prov and not cprov:
ep_counters["provenance_halluc"]["occ"] += 1
ep_counters["provenance_halluc"]["dossiers"].add(fichier)
se = str(od.get("se_coche", "")).strip()
cse = str(cd.get("se_coche", "")).strip()
if se and not cse:
ep_counters["se_coche_halluc"]["occ"] += 1
ep_counters["se_coche_halluc"]["dossiers"].add(fichier)
das = od.get("das_etab") or []
cdas = cd.get("das_etab") or []
dp = (od.get("dp_etab") or {}).get("code", "")
if dp and not [x for x in das if isinstance(x, dict) and x.get("code")]:
ep_counters["das_manquant"]["occ"] += 1
ep_counters["das_manquant"]["dossiers"].add(fichier)
for od2, cd2 in zip(das, cdas):
if isinstance(od2, dict) and isinstance(cd2, dict):
if od2.get("code") != cd2.get("code") and cd2.get("code"):
oc = od2.get("code", "")
if len(oc) >= 7 and oc[:4].isalpha():
ep_counters["acte_dans_das"]["occ"] += 1
ep_counters["acte_dans_das"]["dossiers"].add(fichier)
else:
ep_counters["das_code_wrong"]["occ"] += 1
ep_counters["das_code_wrong"]["dossiers"].add(fichier)
if ptype == "FICHE_ADMIN_2_2":
m = str(od.get("maintien_avis_controleur", "")).strip()
cm_ = str(cd.get("maintien_avis_controleur", "")).strip().lower()
if m.upper() == "X" and cm_ == "oui":
ep_counters["maintien_X"]["occ"] += 1
ep_counters["maintien_X"]["dossiers"].add(fichier)
for k in od:
if "date" in k.lower():
ov = str(od.get(k, "")).strip()
cv = str(cd.get(k, "")).strip()
if ov != cv:
oy = re.findall(r"1[6-9]", ov)
cy = re.findall(r"1[6-9]", cv)
if oy and cy and oy != cy:
ep_counters["annee_mal_lue"]["occ"] += 1
ep_counters["annee_mal_lue"]["dossiers"].add(fichier)
prec = round(dos_correct / dos_total * 100) if dos_total else 0
per_dossier.append({
"fichier": fichier.replace(".pdf", ""),
"total": dos_total, "correct": dos_correct,
"errors": dos_total - dos_correct,
"precision": prec, "structural_error": structural_error,
})
total_g += dos_total
correct_g += dos_correct
prec_g = round(correct_g / total_g * 100, 1) if total_g else 0
n_total = len(orig_list)
error_patterns = []
for desc, key, gravite in [
("DR confondu avec DAS", "dr_confondu_das", "Critique"),
("Année mal lue (ex : 2017 au lieu de 2018)", "annee_mal_lue", "Haute"),
("se_coche inventé ('1' ou '4' au lieu de vide)", "se_coche_halluc", "Haute"),
("maintien_avis = 'X' au lieu de 'oui'", "maintien_X", "Haute"),
("provenance inventé ('8' au lieu de vide)", "provenance_halluc", "Haute"),
("Code acte mis dans DAS", "acte_dans_das", "Haute"),
("DAS entier manquant", "das_manquant", "Critique"),
("DAS code mauvais", "das_code_wrong", "Critique"),
("JSON non parsable", "json_non_parsable", "Critique"),
]:
e = ep_counters[key]
if e["occ"] > 0:
error_patterns.append({
"desc": desc, "occ": e["occ"],
"dossiers": len(e["dossiers"]), "n_total": n_total,
"gravite": gravite,
})
return {
"total": total_g, "correct": correct_g,
"errors": total_g - correct_g, "precision": prec_g,
"per_dossier": per_dossier, "per_cat": per_cat,
"per_type": per_type, "error_patterns": error_patterns,
}
# ─── Sections PDF ─────────────────────────────────────────────────────────────
def _section_header(story, S, acc, text):
story.append(Paragraph(text, S["section"]))
story.append(HRFlowable(width="100%", thickness=0.5, color=acc, spaceAfter=6))
def _build_header(story, S, acc, meta):
hdr = Table(
[[Paragraph(f"BILAN D'EXTRACTION —\nMODÈLE {LABEL}", S["title"]),
Paragraph(meta, S["subtitle"])]],
colWidths=[10*cm, 7*cm],
)
hdr.setStyle(_TS([
("BACKGROUND", (0, 0), (-1, -1), acc),
("LEFTPADDING", (0, 0), (-1, -1), 16),
("RIGHTPADDING", (0, 0), (-1, -1), 12),
("TOPPADDING", (0, 0), (-1, -1), 16),
("BOTTOMPADDING", (0, 0), (-1, -1), 16),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
]))
story.append(hdr)
story.append(Spacer(1, 0.5*cm))
def _build_kpis(story, S, acc, cmp):
GREEN = colors.HexColor("#27ae60")
RED = colors.HexColor("#e74c3c")
kpi_num_style = ParagraphStyle("kpi_num2", parent=S["kpi_num"], fontSize=28, leading=32)
def kpi_cell(num, lbl, color=colors.HexColor("#333333")):
return [Paragraph(f'<font color="{color.hexval()}">{num}</font>', kpi_num_style),
Paragraph(lbl, S["kpi_lbl"])]
cells = [
kpi_cell(str(len(cmp["per_dossier"])), "Dossiers analysés", acc),
kpi_cell(str(cmp["total"]), "Champs comparés", colors.HexColor("#333333")),
kpi_cell(str(cmp["correct"]), "Champs corrects", GREEN),
kpi_cell(str(cmp["errors"]), "Champs en erreur", RED),
kpi_cell(f"{cmp['precision']}%", "Précision globale",
GREEN if cmp["precision"] >= 85 else RED),
]
kpi_t = Table([[c[0] for c in cells], [c[1] for c in cells]], colWidths=[3.4*cm]*5)
kpi_t.setStyle(_TS([
("BACKGROUND", (0, 0), (-1, -1), colors.HexColor("#f8f9fa")),
("BOX", (0, 0), (-1, -1), 0.5, colors.HexColor("#dddddd")),
("INNERGRID", (0, 0), (-1, -1), 0.3, colors.HexColor("#eeeeee")),
("TOPPADDING", (0, 0), (-1, 0), 14),
("BOTTOMPADDING", (0, 0), (-1, 0), 6),
("TOPPADDING", (0, 1), (-1, 1), 4),
("BOTTOMPADDING", (0, 1), (-1, 1), 12),
("ALIGN", (0, 0), (-1, -1), "CENTER"),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
]))
story.append(kpi_t)
story.append(Spacer(1, 0.4*cm))
def _build_per_dossier(story, S, acc, W, cmp):
header = ["N° OGC", "Champs\ntotaux", "Champs\ncorrects", "Erreurs",
"Précision", "Err. structurelle"]
rows = [header]
style_extra = []
for i, d in enumerate(cmp["per_dossier"], start=1):
prec = d["precision"]
rows.append([d["fichier"], str(d["total"]), str(d["correct"]),
str(d["errors"]), f"{prec}%",
"■ Oui" if d["structural_error"] else ""])
pc = _prec_color(prec)
style_extra += [
("TEXTCOLOR", (4, i), (4, i), pc),
("FONTNAME", (4, i), (4, i), "Helvetica-Bold"),
("TEXTCOLOR", (3, i), (3, i),
colors.HexColor("#e74c3c") if d["errors"] > 0 else colors.HexColor("#27ae60")),
("FONTNAME", (3, i), (3, i), "Helvetica-Bold"),
("TEXTCOLOR", (2, i), (2, i), colors.HexColor("#27ae60")),
("FONTNAME", (2, i), (2, i), "Helvetica-Bold"),
]
if d["structural_error"]:
style_extra += [("TEXTCOLOR", (5, i), (5, i), colors.HexColor("#e74c3c")),
("FONTNAME", (5, i), (5, i), "Helvetica-Bold")]
tot_prec = round(cmp["correct"] / cmp["total"] * 100, 1) if cmp["total"] else 0
n_struct = sum(1 for d in cmp["per_dossier"] if d["structural_error"])
rows.append(["TOTAL", str(cmp["total"]), str(cmp["correct"]),
str(cmp["errors"]), f"{tot_prec}%", f"{n_struct} dossier(s)"])
n = len(rows)
style_extra += [("BACKGROUND", (0, n-1), (-1, n-1), colors.HexColor("#eaf0fb")),
("FONTNAME", (0, n-1), (-1, n-1), "Helvetica-Bold")]
col_w = [W*0.16, W*0.12, W*0.14, W*0.11, W*0.13, W*0.34]
t = Table([[Paragraph(str(c), S["th"] if i == 0 else S["small"]) for c in row]
for i, row in enumerate(rows)], colWidths=col_w)
t.setStyle(_TS(_base_table_style(acc) + style_extra))
story.append(t)
def _build_per_cat(story, S, acc, W, cmp):
cat_order = [
"DAS", "Données séjour", "DP / DR", "Données RUM",
"Signataires / Dates", "Métadonnées", "Concertation (2/2)",
"Éléments de preuve", "Accord / SE", "GHM / GHS", "Actes",
]
rows = [["Catégorie", "Champs\ntotaux", "Champs\ncorrects", "Erreurs", "Précision"]]
style_extra = []
for i, cat in enumerate(cat_order, start=1):
d = cmp["per_cat"].get(cat)
if not d:
continue
prec = round(d["correct"] / d["total"] * 100) if d["total"] else 0
rows.append([cat, str(d["total"]), str(d["correct"]),
str(d["total"] - d["correct"]), f"{prec}%"])
style_extra += [("TEXTCOLOR", (4, i), (4, i), _prec_color(prec)),
("FONTNAME", (4, i), (4, i), "Helvetica-Bold")]
col_w = [W*0.40, W*0.15, W*0.15, W*0.15, W*0.15]
t = Table([[Paragraph(str(c), S["th"] if i == 0 else S["small"]) for c in row]
for i, row in enumerate(rows)], colWidths=col_w)
t.setStyle(_TS(_base_table_style(acc) + style_extra))
story.append(t)
def _build_per_type(story, S, acc, W, cmp):
rows = [["Type de page", "Champs\ntotaux", "Champs\ncorrects", "Erreurs", "Précision"]]
style_extra = []
for i, (ptype, d) in enumerate(sorted(cmp["per_type"].items(),
key=lambda x: -x[1]["total"]), start=1):
prec = round(d["correct"] / d["total"] * 100) if d["total"] else 0
label = (ptype.replace("FICHE_", "Fiche ").replace("_RECUEIL", "de recueil")
.replace("_ADMIN_", " administrative ").replace("_2_2", "2/2")
.replace("_1_2", "1/2").replace("ELEMENTS_PREUVE", "Éléments de preuve"))
rows.append([label, str(d["total"]), str(d["correct"]),
str(d["total"] - d["correct"]), f"{prec}%"])
style_extra += [("TEXTCOLOR", (4, i), (4, i), _prec_color(prec)),
("FONTNAME", (4, i), (4, i), "Helvetica-Bold")]
col_w = [W*0.40, W*0.15, W*0.15, W*0.15, W*0.15]
t = Table([[Paragraph(str(c), S["th"] if i == 0 else S["small"]) for c in row]
for i, row in enumerate(rows)], colWidths=col_w)
t.setStyle(_TS(_base_table_style(acc) + style_extra))
story.append(t)
def _build_error_patterns(story, S, acc, W, cmp):
rows = [["Ce que le modèle a raté — catégorie d'erreur",
"Occur-\nrences", "Sur combien\nde dossiers", "Gravité"]]
style_extra = []
for i, p in enumerate(cmp["error_patterns"], start=1):
gc = _gravite_color(p["gravite"])
rows.append([p["desc"], str(p["occ"]),
f"{p['dossiers']} / {p['n_total']}", p["gravite"]])
style_extra += [("TEXTCOLOR", (3, i), (3, i), gc),
("FONTNAME", (3, i), (3, i), "Helvetica-Bold")]
col_w = [W*0.56, W*0.10, W*0.17, W*0.17]
t = Table([[Paragraph(str(c), S["th"] if i == 0 else S["small"]) for c in row]
for i, row in enumerate(rows)], colWidths=col_w)
t.setStyle(_TS(_base_table_style(acc) + style_extra))
story.append(t)
def _build_timing(story, S, acc, W, timing_data):
if not timing_data:
story.append(Paragraph(
"Aucune donnée temporelle disponible. "
"Relancez l'extraction pour générer timing_stats.json.", S["small"]))
return
total_s = sum(t.get("duree_totale_s") or 0 for t in timing_data)
total_pages= sum(t.get("nb_pages_total") or 0 for t in timing_data)
total_err = sum(len(t.get("erreurs", [])) for t in timing_data)
total_429 = sum(len(t.get("blocages_429", [])) for t in timing_data)
total_wait = sum(b["attente_s"] for t in timing_data for b in t.get("blocages_429", []))
n_dos = len(timing_data)
story.append(Paragraph("Résumé global", S["bold"]))
story.append(Spacer(1, 0.2*cm))
kpi_rows = [
["Durée totale d'extraction", _fmt_s(total_s)],
["Durée moyenne / dossier", _fmt_s(total_s / n_dos) if n_dos else ""],
["Durée moyenne / page", _fmt_s(total_s / total_pages) if total_pages else ""],
["Pages traitées", str(total_pages)],
["Erreurs totales", str(total_err)],
["Blocages rate limit (429)", str(total_429)],
["Temps perdu en attentes 429", _fmt_s(total_wait)],
["Temps utile (hors 429)", _fmt_s(total_s - total_wait)],
]
style_kpi = _base_table_style(acc) + [
("ALIGN", (1, 0), (1, -1), "CENTER"),
("TEXTCOLOR", (1, 0), (1, -1), acc),
("FONTNAME", (1, 0), (1, -1), "Helvetica-Bold"),
]
t_kpi = Table([[Paragraph(k, S["small"]), Paragraph(v, S["small"])]
for k, v in kpi_rows], colWidths=[W*0.6, W*0.4])
t_kpi.setStyle(_TS(style_kpi))
story.append(t_kpi)
story.append(Spacer(1, 0.4*cm))
story.append(Paragraph("Détail par dossier", S["bold"]))
story.append(Spacer(1, 0.2*cm))
header = ["Dossier", "Début", "Fin", "Durée", "Pages",
"Erreurs", "Blocages\n429", "Attente\n429"]
rows = [header]
style_dos = _base_table_style(acc)
for i, t in enumerate(timing_data, start=1):
debut = (t.get("debut") or "")[:16].replace("T", " ")
fin = (t.get("fin") or "")[:16].replace("T", " ")
n_err = len(t.get("erreurs", []))
n_b = len(t.get("blocages_429", []))
att = sum(b["attente_s"] for b in t.get("blocages_429", []))
rows.append([
t["fichier"].replace(".pdf", ""), debut, fin,
_fmt_s(t.get("duree_totale_s")), str(t.get("nb_pages_total", "")),
str(n_err), str(n_b), _fmt_s(att) if att else "",
])
if n_err > 0:
style_dos += [("TEXTCOLOR", (5, i), (5, i), colors.HexColor("#e74c3c")),
("FONTNAME", (5, i), (5, i), "Helvetica-Bold")]
if n_b > 0:
style_dos += [("TEXTCOLOR", (6, i), (6, i), colors.HexColor("#e67e22")),
("FONTNAME", (6, i), (6, i), "Helvetica-Bold")]
col_w = [W*0.18, W*0.14, W*0.14, W*0.10, W*0.08, W*0.09, W*0.10, W*0.17]
t_dos = Table([[Paragraph(str(c), S["th"] if i == 0 else S["small"]) for c in row]
for i, row in enumerate(rows)], colWidths=col_w)
t_dos.setStyle(_TS(style_dos))
story.append(t_dos)
has_issues = any(t.get("erreurs") or t.get("blocages_429") for t in timing_data)
if has_issues:
story.append(Spacer(1, 0.4*cm))
story.append(Paragraph("Erreurs et blocages détaillés", S["bold"]))
story.append(Spacer(1, 0.2*cm))
for t in timing_data:
if not t.get("erreurs") and not t.get("blocages_429"):
continue
story.append(Paragraph(t["fichier"].replace(".pdf", ""), S["bold"]))
for err in t.get("erreurs", []):
story.append(Paragraph(
f" ⚠ Page {err['page']}{err['phase']} : {err['message'][:100]}",
S["warn"]))
for b in t.get("blocages_429", []):
story.append(Paragraph(
f" ⏳ Blocage 429 — tentative {b['tentative']}, "
f"attente {b['attente_s']}s à {str(b.get('ts',''))[:16].replace('T',' ')}",
ParagraphStyle("b429", parent=S["small"],
textColor=colors.HexColor("#e67e22"))))
story.append(Spacer(1, 0.1*cm))
# ─── Main builder ─────────────────────────────────────────────────────────────
def build_pdf():
W = A4[0] - 4*cm
if not JSON_PATH.exists():
print(f"⚠ JSON introuvable : {JSON_PATH}")
sys.exit(1)
with open(JSON_PATH, encoding="utf-8") as f:
orig_data = json.load(f)
cmp = None
if CORRECTION_PATH.exists():
with open(CORRECTION_PATH, encoding="utf-8") as f:
corr_data = json.load(f)
cmp = compare_extractions(orig_data, corr_data)
timing_data = None
if TIMING_PATH.exists():
with open(TIMING_PATH, encoding="utf-8") as f:
timing_data = json.load(f)
S = make_styles(ACC)
story = []
if cmp:
etabl = finess = controle = ""
for r in orig_data:
for pt in r.get("pages_traitees", []):
d = pt.get("data", {})
if d.get("etablissement"):
etabl = d["etablissement"]
if d.get("finess"):
finess = d["finess"]
if d.get("date_debut_controle"):
controle = d["date_debut_controle"]
if etabl and finess and controle:
break
if etabl:
break
meta = (f"{etabl} · FINESS {finess}\n"
f"{len(orig_data)} dossiers OGC · Contrôle {controle} · "
f"{datetime.now().strftime('%B %Y').capitalize()}")
else:
meta = (f"{len(orig_data)} dossiers OGC\n"
f"Généré le {datetime.now().strftime('%d/%m/%Y à %H:%M')}")
_build_header(story, S, ACC, meta)
if cmp:
_section_header(story, S, ACC, "1. Indicateurs globaux")
_build_kpis(story, S, ACC, cmp)
_section_header(story, S, ACC, "2. Résultats par dossier OGC")
_build_per_dossier(story, S, ACC, W, cmp)
story.append(Spacer(1, 0.4*cm))
_section_header(story, S, ACC, "3. Précision par catégorie de champ")
_build_per_cat(story, S, ACC, W, cmp)
story.append(Spacer(1, 0.4*cm))
_section_header(story, S, ACC, "4. Précision par type de page")
_build_per_type(story, S, ACC, W, cmp)
story.append(Spacer(1, 0.4*cm))
if cmp["error_patterns"]:
_section_header(story, S, ACC, "5. Patterns d'erreurs récurrents")
_build_error_patterns(story, S, ACC, W, cmp)
story.append(Spacer(1, 0.4*cm))
sec_timing = 6
else:
sec_timing = 1
story.append(PageBreak())
_section_header(story, S, ACC, f"{sec_timing}. Analyse temporelle")
_build_timing(story, S, ACC, W, timing_data)
story.append(Spacer(1, 0.5*cm))
note = (
"Rapport généré par comparaison automatique de extraction_ogc_raw.json "
"vs extraction_ogc_raw_Correction.json · "
f"Périmètre : {len(orig_data)} dossiers OGC · "
"Les pourcentages de précision sont calculés champ par champ."
if cmp else
f"Rapport généré automatiquement · {len(orig_data)} dossiers OGC · "
"Aucun fichier de correction disponible — métriques de précision non calculées."
)
story.append(HRFlowable(width="100%", thickness=0.3, color=colors.grey))
story.append(Paragraph(note, S["footnote"]))
doc = SimpleDocTemplate(
str(REPORT_PATH), pagesize=A4,
leftMargin=2*cm, rightMargin=2*cm,
topMargin=2*cm, bottomMargin=2*cm,
title=f"Bilan extraction OGC — {LABEL}",
author="EttaSanté / T2A",
)
doc.build(story)
print(f"{REPORT_PATH}")
if __name__ == "__main__":
print(f"Génération bilan {LABEL}...")
build_pdf()