Compare commits
3 Commits
b47f5c47e0
...
1255468676
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1255468676 | ||
|
|
c0b0cd9b87 | ||
|
|
6c8184cc03 |
125
pipeline/deskew.py
Normal file
125
pipeline/deskew.py
Normal file
@@ -0,0 +1,125 @@
|
||||
"""Détection d'angle de skew + redressement automatique des pages scannées.
|
||||
|
||||
Technique : Hough Transform sur les lignes détectées par Canny, puis moyenne
|
||||
des angles des lignes « quasi horizontales » (±15° par rapport à l'horizontale).
|
||||
Les fiches OGC ont énormément de traits de tableau → signal très fort.
|
||||
|
||||
Seuil : on ne corrige que si |angle| > `MIN_ANGLE_DEG` (0.3° par défaut) pour
|
||||
éviter de toucher les scans déjà bien cadrés et introduire du bruit inutile.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Tuple
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
try:
|
||||
import cv2 # type: ignore
|
||||
_HAS_CV2 = True
|
||||
except ImportError:
|
||||
_HAS_CV2 = False
|
||||
|
||||
|
||||
MIN_ANGLE_DEG = 0.3 # en-dessous, on ne corrige pas
|
||||
MAX_ANGLE_DEG = 10.0 # au-dessus, c'est anormal → suspect, on ne corrige pas
|
||||
NEAR_HORIZONTAL_BAND = 15.0 # degrés : bande autour de l'horizontale pour filtrer
|
||||
|
||||
|
||||
def detect_skew_angle(img: Image.Image) -> float:
|
||||
"""Retourne l'angle de skew en degrés (positif = tourné dans le sens
|
||||
des aiguilles d'une montre) à appliquer pour redresser l'image.
|
||||
|
||||
Si aucune ligne horizontale n'est trouvée, retourne 0.0.
|
||||
Si l'angle détecté est hors [-MAX_ANGLE_DEG, +MAX_ANGLE_DEG], retourne 0.0
|
||||
(probablement une erreur de détection, on ne corrige pas).
|
||||
"""
|
||||
if not _HAS_CV2:
|
||||
return 0.0
|
||||
gray = np.array(img.convert("L"))
|
||||
# Réduire l'image pour accélérer (max 1500 px de large)
|
||||
h, w = gray.shape
|
||||
if w > 1500:
|
||||
scale = 1500 / w
|
||||
gray = cv2.resize(gray, (1500, int(h * scale)), interpolation=cv2.INTER_AREA)
|
||||
|
||||
# Canny edges — paramètres standards documents
|
||||
edges = cv2.Canny(gray, 50, 150, apertureSize=3)
|
||||
# Hough Lines probabiliste : rapide et robuste
|
||||
lines = cv2.HoughLinesP(
|
||||
edges, rho=1, theta=np.pi / 180, threshold=200,
|
||||
minLineLength=gray.shape[1] // 4, # au moins 25% de la largeur
|
||||
maxLineGap=20,
|
||||
)
|
||||
if lines is None or len(lines) == 0:
|
||||
return 0.0
|
||||
|
||||
# Calculer l'angle de chaque ligne en degrés
|
||||
angles = []
|
||||
for line in lines:
|
||||
x1, y1, x2, y2 = line[0]
|
||||
if x2 == x1:
|
||||
continue # ligne verticale, ignorée
|
||||
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
|
||||
# On ne garde que les lignes proches de l'horizontale
|
||||
if abs(angle) < NEAR_HORIZONTAL_BAND:
|
||||
angles.append(angle)
|
||||
|
||||
if not angles:
|
||||
return 0.0
|
||||
|
||||
# Moyenne robuste : médiane plutôt que mean, moins sensible aux outliers
|
||||
angle = float(np.median(angles))
|
||||
if abs(angle) > MAX_ANGLE_DEG:
|
||||
return 0.0 # suspect → on ne corrige pas
|
||||
return angle
|
||||
|
||||
|
||||
def deskew_image(img: Image.Image,
|
||||
angle: float | None = None,
|
||||
min_angle: float = MIN_ANGLE_DEG) -> Tuple[Image.Image, float]:
|
||||
"""Redresse une image si le skew détecté dépasse `min_angle`.
|
||||
|
||||
Retourne (image_eventuellement_rotee, angle_applique).
|
||||
Si |angle| < min_angle, retourne l'image inchangée et angle=0.0.
|
||||
"""
|
||||
if angle is None:
|
||||
angle = detect_skew_angle(img)
|
||||
if abs(angle) < min_angle:
|
||||
return img, 0.0
|
||||
# PIL.Image.rotate : positive angle = counter-clockwise
|
||||
# detect_skew retourne positif = clockwise → on inverse pour PIL
|
||||
rotated = img.rotate(
|
||||
angle,
|
||||
resample=Image.Resampling.BICUBIC,
|
||||
expand=False,
|
||||
fillcolor="white",
|
||||
)
|
||||
return rotated, angle
|
||||
|
||||
|
||||
def deskew_file(src: Path, dst: Path | None = None,
|
||||
min_angle: float = MIN_ANGLE_DEG) -> float:
|
||||
"""Version fichier → fichier. Écrase `src` si `dst` est None.
|
||||
Retourne l'angle appliqué (0.0 si pas de rotation)."""
|
||||
img = Image.open(src)
|
||||
rotated, angle = deskew_image(img, min_angle=min_angle)
|
||||
out = dst or src
|
||||
rotated.save(out, "PNG", optimize=True)
|
||||
return angle
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
import glob
|
||||
paths = [Path(p) for p in (sys.argv[1:] or sorted(glob.glob(".cache/images/*/page_01.png")))]
|
||||
print(f"Deskew sur {len(paths)} images (seuil={MIN_ANGLE_DEG}°)...")
|
||||
total_corrected = 0
|
||||
for p in paths:
|
||||
angle = detect_skew_angle(Image.open(p))
|
||||
mark = "→" if abs(angle) >= MIN_ANGLE_DEG else "·"
|
||||
if abs(angle) >= MIN_ANGLE_DEG:
|
||||
total_corrected += 1
|
||||
print(f" {mark} {p} : {angle:+.2f}°")
|
||||
print(f"\n{total_corrected}/{len(paths)} images auraient besoin d'un redressement.")
|
||||
@@ -11,7 +11,8 @@ from .prompts import (
|
||||
PAGE_TYPES, PROMPT_HEADER,
|
||||
SCHEMA_RECUEIL_RECODAGE, RECUEIL_RECODAGE_ZONE,
|
||||
)
|
||||
from .checkboxes import detect_accord_desaccord, RECUEIL_ACCORD_DESACCORD, parse_ghs_injustifie
|
||||
from .checkboxes import detect_accord_desaccord, RECUEIL_ACCORD_DESACCORD, parse_ghs_injustifie, CheckboxZones
|
||||
from .zones_config import load_config, get_zone
|
||||
from .validation import annotate as validate_annotate
|
||||
|
||||
|
||||
@@ -108,6 +109,23 @@ def parse_json_output(raw: str) -> dict | None:
|
||||
return {"_raw": raw, "_parse_error": str(e)}
|
||||
|
||||
|
||||
def _recueil_zones() -> tuple[tuple, CheckboxZones]:
|
||||
"""Charge les zones configurables pour la page recueil.
|
||||
|
||||
Retourne (recodage_zone, accord_desaccord_zones). Si la config n'a pas
|
||||
d'entrée, on retombe sur les constantes compilées.
|
||||
"""
|
||||
cfg = load_config()
|
||||
reco = get_zone("recueil", "codage_reco", cfg) or RECUEIL_RECODAGE_ZONE
|
||||
acc = get_zone("recueil", "accord_checkbox", cfg)
|
||||
des = get_zone("recueil", "desaccord_checkbox", cfg)
|
||||
if acc and des:
|
||||
cb = CheckboxZones(accord=acc, desaccord=des)
|
||||
else:
|
||||
cb = RECUEIL_ACCORD_DESACCORD
|
||||
return reco, cb
|
||||
|
||||
|
||||
def _extract_recodage_crop(image_path: Path, ocr: QwenVLOCR) -> dict | None:
|
||||
"""Second passage VLM sur le crop zonal de la colonne Recodage.
|
||||
|
||||
@@ -122,7 +140,8 @@ def _extract_recodage_crop(image_path: Path, ocr: QwenVLOCR) -> dict | None:
|
||||
try:
|
||||
img = Image.open(image_path)
|
||||
w, h = img.size
|
||||
x1, y1, x2, y2 = RECUEIL_RECODAGE_ZONE
|
||||
reco_zone, _ = _recueil_zones()
|
||||
x1, y1, x2, y2 = reco_zone
|
||||
crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
|
||||
crop_path = image_path.parent / f"{image_path.stem}_recodage.png"
|
||||
crop.save(crop_path)
|
||||
@@ -267,7 +286,8 @@ def extract_dossier(pdf_path: str | Path, verbose: bool = True,
|
||||
# sur la fiche recueil. GLM-OCR / Qwen ne lisent pas les cases
|
||||
# à cocher (cf. scratch/test_prompt_crop_v2.py).
|
||||
if ptype == "recueil" and isinstance(parsed, dict):
|
||||
cb = detect_accord_desaccord(img_path, RECUEIL_ACCORD_DESACCORD)
|
||||
_, cb_zones = _recueil_zones()
|
||||
cb = detect_accord_desaccord(img_path, cb_zones)
|
||||
parsed["accord_desaccord"] = cb["decision"]
|
||||
parsed["_checkbox_debug"] = cb # ratios + diff pour audit
|
||||
# ghs_injustifie : Qwen renvoie parfois "0 SE 1 2 3 4 ATU FFM FSD"
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
"""PDF → images PNG 300 dpi avec cache par hash SHA256."""
|
||||
"""PDF → images PNG 300 dpi avec cache par hash SHA256.
|
||||
|
||||
Applique optionnellement un deskew automatique (redressement) sur chaque page
|
||||
pour corriger le biais d'inclinaison des scans. Voir pipeline/deskew.py.
|
||||
"""
|
||||
import hashlib
|
||||
import os
|
||||
from pathlib import Path
|
||||
from pdf2image import convert_from_path
|
||||
from PIL import Image
|
||||
|
||||
from .deskew import deskew_image, MIN_ANGLE_DEG
|
||||
|
||||
DEFAULT_DPI = 300
|
||||
CACHE_ROOT = Path(".cache/images")
|
||||
|
||||
@@ -18,23 +24,37 @@ def pdf_hash(pdf_path: str) -> str:
|
||||
return h.hexdigest()[:16]
|
||||
|
||||
|
||||
def pdf_to_images(pdf_path: str, dpi: int = DEFAULT_DPI, cache_root: Path = CACHE_ROOT) -> list[Path]:
|
||||
def pdf_to_images(pdf_path: str, dpi: int = DEFAULT_DPI,
|
||||
cache_root: Path = CACHE_ROOT,
|
||||
deskew: bool = True) -> list[Path]:
|
||||
"""Convertit un PDF en PNG 300 dpi. Retourne la liste des chemins (1 par page).
|
||||
|
||||
Le cache est indexé par hash du PDF : un PDF inchangé n'est jamais reconverti.
|
||||
|
||||
Avec `deskew=True` (défaut), chaque page est redressée si son angle de skew
|
||||
dépasse le seuil défini dans `pipeline.deskew.MIN_ANGLE_DEG` (0.3°). L'angle
|
||||
appliqué est persisté dans un fichier `<page>.skew` à côté (pour audit).
|
||||
"""
|
||||
cache_root = Path(cache_root)
|
||||
h = pdf_hash(pdf_path)
|
||||
out_dir = cache_root / h
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
existing = sorted(out_dir.glob("page_*.png"))
|
||||
# Le glob est strict pour ne pas attraper les crops intermédiaires
|
||||
# (page_XX_recodage.png, etc.)
|
||||
existing = sorted(p for p in out_dir.glob("page_*.png")
|
||||
if p.stem.replace("page_", "").isdigit())
|
||||
if existing:
|
||||
return existing
|
||||
|
||||
pages = convert_from_path(pdf_path, dpi)
|
||||
paths = []
|
||||
for i, img in enumerate(pages, 1):
|
||||
if deskew:
|
||||
img, applied = deskew_image(img)
|
||||
if applied != 0.0:
|
||||
# Trace d'audit : on note l'angle corrigé
|
||||
(out_dir / f"page_{i:02d}.skew").write_text(f"{applied:.3f}\n")
|
||||
p = out_dir / f"page_{i:02d}.png"
|
||||
img.save(p, "PNG", optimize=True)
|
||||
paths.append(p)
|
||||
|
||||
@@ -27,22 +27,64 @@ class QwenVLOCR:
|
||||
|
||||
def _init_model(self):
|
||||
t0 = time.time()
|
||||
import os as _os
|
||||
|
||||
# max_pixels limite le nombre de patches visuels pour éviter l'OOM
|
||||
# sur images 300 dpi (2481x3509). ~800 patches = équilibre qualité/VRAM,
|
||||
# tient confortablement dans ~5-6 Go même avec d'autres processus GPU
|
||||
# en arrière-plan. Configurable via env var QWEN_MAX_PIXELS (en patches).
|
||||
import os as _os
|
||||
max_pixels = int(_os.environ.get("QWEN_MAX_PIXELS", 800)) * 28 * 28
|
||||
self.processor = AutoProcessor.from_pretrained(
|
||||
MODEL_PATH,
|
||||
min_pixels=256 * 28 * 28,
|
||||
max_pixels=max_pixels,
|
||||
)
|
||||
|
||||
# Device : "auto" par défaut (GPU si dispo), "cpu" pour forcer le CPU
|
||||
# quand la VRAM est saturée par d'autres process. Configurable via
|
||||
# QWEN_DEVICE=cpu.
|
||||
device = _os.environ.get("QWEN_DEVICE", "auto").lower()
|
||||
if device == "cpu":
|
||||
# Sur CPU on cherche à maximiser le throughput :
|
||||
# 1. Utiliser tous les cores via torch.set_num_threads (set_num_threads
|
||||
# prime sur OMP_NUM_THREADS pour les ops PyTorch natifs).
|
||||
# 2. Choisir bfloat16 si le CPU le supporte nativement (Zen 5,
|
||||
# Zen 4, Intel Sapphire Rapids+ ont AVX-512 BF16). Sinon float32.
|
||||
n_threads = int(_os.environ.get("TORCH_NUM_THREADS", _os.cpu_count() or 8))
|
||||
torch.set_num_threads(n_threads)
|
||||
try:
|
||||
torch.set_num_interop_threads(n_threads)
|
||||
except RuntimeError:
|
||||
pass # déjà initialisé, ignorer
|
||||
|
||||
# Détection AVX-512 BF16 via /proc/cpuinfo (Linux)
|
||||
use_bf16 = False
|
||||
try:
|
||||
with open("/proc/cpuinfo") as f:
|
||||
flags = f.read()
|
||||
use_bf16 = "avx512_bf16" in flags or "amx_bf16" in flags
|
||||
except Exception:
|
||||
pass
|
||||
dtype = torch.bfloat16 if use_bf16 else torch.float32
|
||||
|
||||
self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
|
||||
MODEL_PATH,
|
||||
torch_dtype=dtype,
|
||||
device_map={"": "cpu"},
|
||||
low_cpu_mem_usage=True,
|
||||
)
|
||||
self.device_used = "cpu"
|
||||
self.cpu_threads = n_threads
|
||||
self.cpu_dtype = str(dtype).replace("torch.", "")
|
||||
else:
|
||||
self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
|
||||
MODEL_PATH,
|
||||
torch_dtype=torch.bfloat16,
|
||||
device_map="auto",
|
||||
)
|
||||
self.device_used = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
self.cpu_threads = None
|
||||
self.cpu_dtype = None
|
||||
self.model.eval()
|
||||
self.load_time = time.time() - t0
|
||||
self.vram_gb = torch.cuda.memory_allocated() / 1e9 if torch.cuda.is_available() else 0.0
|
||||
|
||||
@@ -28,6 +28,13 @@ import streamlit as st
|
||||
from PIL import Image
|
||||
|
||||
from pipeline.ingest import pdf_to_images
|
||||
from pipeline.zones_config import load_config, save_config, DEFAULT_CONFIG_PATH
|
||||
|
||||
try:
|
||||
from streamlit_drawable_canvas import st_canvas
|
||||
_HAS_CANVAS = True
|
||||
except ImportError:
|
||||
_HAS_CANVAS = False
|
||||
|
||||
|
||||
# ============================================================
|
||||
@@ -268,6 +275,130 @@ def render_page_editor(name: str, ptype: str, extract: dict, gold: dict | None):
|
||||
st.code(page_meta.get("ocr_raw", ""), language="json")
|
||||
|
||||
|
||||
def render_calibration_page():
|
||||
"""Mode 'Calibration zones' : dessine des rectangles à la souris sur une
|
||||
image de référence, sauvegarde dans pipeline/zones_config.json."""
|
||||
st.header("🔧 Calibration des zones")
|
||||
|
||||
if not _HAS_CANVAS:
|
||||
st.error(
|
||||
"Le package `streamlit-drawable-canvas` n'est pas installé.\n"
|
||||
"Installe-le avec : `pip install streamlit-drawable-canvas`"
|
||||
)
|
||||
return
|
||||
|
||||
pdfs = list_pdfs()
|
||||
if not pdfs:
|
||||
st.error("Aucun PDF disponible pour la calibration")
|
||||
return
|
||||
|
||||
col_ctrl, _ = st.columns([1, 3])
|
||||
with col_ctrl:
|
||||
ref_name = st.selectbox(
|
||||
"PDF de référence (bien cadré)",
|
||||
[p.stem for p in pdfs], key="calib_pdf",
|
||||
)
|
||||
page_type = st.selectbox(
|
||||
"Type de page", ["recueil"],
|
||||
help="Aujourd'hui seule la page recueil a des zones configurables",
|
||||
)
|
||||
# Page numéro selon le type (recueil = page 1)
|
||||
page_num = {"recueil": 1}.get(page_type, 1)
|
||||
|
||||
ref_pdf = next(p for p in pdfs if p.stem == ref_name)
|
||||
img_path = pdf_to_images(str(ref_pdf))[page_num - 1]
|
||||
img = Image.open(img_path)
|
||||
img_w, img_h = img.size
|
||||
|
||||
# Charger config existante et préparer les zones
|
||||
cfg = load_config()
|
||||
existing_zones = cfg.get(page_type, {})
|
||||
|
||||
# On scale l'image pour tenir dans le canvas (largeur ~900 px max)
|
||||
canvas_w = 900
|
||||
scale = canvas_w / img_w
|
||||
canvas_h = int(img_h * scale)
|
||||
|
||||
# Préparer les rectangles initiaux depuis la config
|
||||
initial_rects = []
|
||||
for zone_name, z in existing_zones.items():
|
||||
if not isinstance(z, dict): continue
|
||||
initial_rects.append({
|
||||
"type": "rect",
|
||||
"left": z["x1"] * canvas_w,
|
||||
"top": z["y1"] * canvas_h,
|
||||
"width": (z["x2"] - z["x1"]) * canvas_w,
|
||||
"height": (z["y2"] - z["y1"]) * canvas_h,
|
||||
"fill": "rgba(255, 100, 100, 0.15)",
|
||||
"stroke": "red",
|
||||
"strokeWidth": 2,
|
||||
"label_name": zone_name,
|
||||
})
|
||||
|
||||
st.caption(
|
||||
"💡 Dessine un rectangle par zone à la souris. Les zones existantes "
|
||||
"apparaissent déjà pré-dessinées. Tu peux les modifier (drag), "
|
||||
"en ajouter, ou en supprimer (touche Suppr) puis cliquer sur "
|
||||
"**Sauvegarder**."
|
||||
)
|
||||
|
||||
drawing_mode = st.radio(
|
||||
"Mode", ["rect", "transform"], horizontal=True,
|
||||
format_func=lambda x: {"rect": "✏️ Dessiner", "transform": "🖱 Sélectionner / Déplacer"}[x],
|
||||
key="calib_drawing_mode",
|
||||
)
|
||||
|
||||
canvas_result = st_canvas(
|
||||
fill_color="rgba(255, 100, 100, 0.15)",
|
||||
stroke_width=2,
|
||||
stroke_color="red",
|
||||
background_image=img,
|
||||
update_streamlit=True,
|
||||
width=canvas_w,
|
||||
height=canvas_h,
|
||||
drawing_mode=drawing_mode,
|
||||
initial_drawing={"objects": initial_rects, "version": "5.2.1"},
|
||||
key="calib_canvas",
|
||||
)
|
||||
|
||||
# Reconstituer la config à partir des rectangles dessinés
|
||||
rects = (canvas_result.json_data or {}).get("objects", []) if canvas_result.json_data else []
|
||||
|
||||
st.markdown("### Zones détectées")
|
||||
if not rects:
|
||||
st.info("Aucun rectangle dessiné.")
|
||||
return
|
||||
|
||||
new_zones = {}
|
||||
for i, r in enumerate(rects):
|
||||
if r.get("type") != "rect":
|
||||
continue
|
||||
# Récupérer le nom existant si présent, sinon demander
|
||||
default_name = r.get("label_name") or f"zone_{i+1}"
|
||||
name = st.text_input(
|
||||
f"Nom de la zone {i+1}",
|
||||
value=default_name, key=f"calib_name_{i}",
|
||||
)
|
||||
x1 = r["left"] / canvas_w
|
||||
y1 = r["top"] / canvas_h
|
||||
x2 = x1 + r["width"] / canvas_w
|
||||
y2 = y1 + r["height"] / canvas_h
|
||||
desc = existing_zones.get(name, {}).get("description", "")
|
||||
desc = st.text_input(
|
||||
f"Description (optionnel)", value=desc, key=f"calib_desc_{i}",
|
||||
)
|
||||
st.caption(f"Coords relatives : ({x1:.3f}, {y1:.3f}) → ({x2:.3f}, {y2:.3f})")
|
||||
new_zones[name] = {"x1": round(x1, 4), "y1": round(y1, 4),
|
||||
"x2": round(x2, 4), "y2": round(y2, 4),
|
||||
"description": desc}
|
||||
|
||||
if st.button("💾 Sauvegarder la configuration", type="primary"):
|
||||
cfg[page_type] = new_zones
|
||||
path = save_config(cfg)
|
||||
st.success(f"Configuration sauvegardée : {path}")
|
||||
st.json(new_zones)
|
||||
|
||||
|
||||
def main():
|
||||
st.set_page_config(page_title="OGC Overlay", layout="wide")
|
||||
|
||||
@@ -280,6 +411,13 @@ def main():
|
||||
|
||||
st.title("🩺 Extraction OGC — review & gold set")
|
||||
|
||||
# Sélecteur de mode en haut de sidebar
|
||||
with st.sidebar:
|
||||
mode = st.radio("Mode", ["📋 Review dossier", "🔧 Calibration zones"])
|
||||
if mode == "🔧 Calibration zones":
|
||||
render_calibration_page()
|
||||
return
|
||||
|
||||
pdfs = list_pdfs()
|
||||
if not pdfs:
|
||||
st.error(f"Aucun PDF trouvé dans {PDF_DIR}")
|
||||
|
||||
90
pipeline/zones_config.py
Normal file
90
pipeline/zones_config.py
Normal file
@@ -0,0 +1,90 @@
|
||||
"""Configuration des zones d'extraction éditable via l'overlay UI.
|
||||
|
||||
Les coordonnées sont relatives (0..1) dans l'image source. Elles sont chargées
|
||||
au démarrage du pipeline et utilisées à la place des constantes en dur dans
|
||||
`pipeline/prompts.py` et `pipeline/checkboxes.py` — avec fallback sur ces
|
||||
constantes si la config n'est pas présente, pour ne pas casser l'existant.
|
||||
|
||||
Structure :
|
||||
{
|
||||
"recueil": {
|
||||
"codage_reco": {"x1":0.77, "y1":0.330, "x2":0.97, "y2":0.490, "description":"..."},
|
||||
"accord_checkbox": {"x1":..., "y1":..., "x2":..., "y2":..., "description":"..."},
|
||||
"desaccord_checkbox":{...}
|
||||
},
|
||||
"concertation_2": {...}
|
||||
}
|
||||
|
||||
Un fichier unique `zones_config.json` à la racine du projet, ou au chemin pointé
|
||||
par la variable d'env `OGC_ZONES_CONFIG`.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
DEFAULT_CONFIG_PATH = Path(
|
||||
os.environ.get("OGC_ZONES_CONFIG", "zones_config.json")
|
||||
)
|
||||
|
||||
|
||||
# Zones par défaut, identiques aux constantes actuelles dans prompts.py et
|
||||
# checkboxes.py. Sert de fallback et de "mise à jour initiale" quand le
|
||||
# fichier n'existe pas encore.
|
||||
DEFAULTS: dict = {
|
||||
"recueil": {
|
||||
"codage_reco": {
|
||||
"x1": 0.77, "y1": 0.330, "x2": 0.97, "y2": 0.490,
|
||||
"description": "Colonne Recodage (DP / DR / DAS) — exclut le bloc Actes",
|
||||
},
|
||||
"accord_checkbox": {
|
||||
"x1": 0.588, "y1": 0.838, "x2": 0.622, "y2": 0.860,
|
||||
"description": "Case à cocher 'Accord'",
|
||||
},
|
||||
"desaccord_checkbox": {
|
||||
"x1": 0.588, "y1": 0.858, "x2": 0.622, "y2": 0.880,
|
||||
"description": "Case à cocher 'Désaccord'",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def load_config(path: Path = DEFAULT_CONFIG_PATH) -> dict:
|
||||
"""Charge la config JSON, ou retourne les defaults si absente."""
|
||||
if not path.exists():
|
||||
return _deep_copy(DEFAULTS)
|
||||
try:
|
||||
raw = json.loads(path.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
return _deep_copy(DEFAULTS)
|
||||
# Merge : les defaults sont une base, la config utilisateur vient par-dessus
|
||||
merged = _deep_copy(DEFAULTS)
|
||||
for page, zones in raw.items():
|
||||
merged.setdefault(page, {}).update(zones)
|
||||
return merged
|
||||
|
||||
|
||||
def save_config(cfg: dict, path: Path = DEFAULT_CONFIG_PATH) -> Path:
|
||||
path.write_text(json.dumps(cfg, ensure_ascii=False, indent=2), encoding="utf-8")
|
||||
return path
|
||||
|
||||
|
||||
def get_zone(page_type: str, zone_name: str,
|
||||
config: dict | None = None) -> tuple[float, float, float, float] | None:
|
||||
"""Récupère une zone depuis la config ou les defaults.
|
||||
|
||||
Retourne (x1, y1, x2, y2) ou None si inconnue.
|
||||
"""
|
||||
cfg = config or load_config()
|
||||
z = cfg.get(page_type, {}).get(zone_name)
|
||||
if not isinstance(z, dict):
|
||||
return None
|
||||
try:
|
||||
return (float(z["x1"]), float(z["y1"]), float(z["x2"]), float(z["y2"]))
|
||||
except (KeyError, ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
def _deep_copy(d: dict) -> dict:
|
||||
return json.loads(json.dumps(d))
|
||||
Reference in New Issue
Block a user