Compare commits

...

3 Commits

Author SHA1 Message Date
Dom
1255468676 feat(ui): calibration visuelle des zones via dessin à la souris
Nouveau module pipeline/zones_config.py : charge les zones d'extraction
depuis un fichier zones_config.json (coordonnées relatives 0-1), avec
fallback sur les constantes Python. Config partagée entre :
- pipeline/extract.py (crop colonne Recodage)
- pipeline/checkboxes.py (cases Accord/Désaccord)

Zones configurables aujourd'hui (page recueil) :
- codage_reco (crop zonal pour le second passage VLM)
- accord_checkbox / desaccord_checkbox (densité de pixels)

Mode "🔧 Calibration zones" ajouté dans pipeline/ui_overlay.py :
- Sélection d'un PDF de référence (idéalement bien cadré)
- Canvas interactif (streamlit-drawable-canvas) avec les zones
  existantes pré-dessinées en rouge
- Dessin/déplacement/redimensionnement à la souris
- Saisie d'un nom et description par zone
- Sauvegarde en JSON (ou OGC_ZONES_CONFIG si défini)

Permet au métier (Khalid) de recalibrer les zones sans toucher au code,
par exemple si le formulaire ATIH évolue ou si les scans sont d'un autre
établissement.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 23:07:59 +02:00
Dom
c0b0cd9b87 perf(ocr_qwen): support CPU + bfloat16 AVX-512 + threads explicites
Trois ajouts pour rendre le pipeline utilisable sur CPU quand la VRAM
est saturée par d'autres process :

1. Variable QWEN_DEVICE=cpu pour forcer le device CPU. Le défaut "auto"
   choisit CUDA si dispo, fallback CPU sinon.

2. Sur CPU, détection automatique du support AVX-512 BF16 via /proc/cpuinfo
   (Zen 4/5, Intel Sapphire Rapids+). Si présent, bfloat16 au lieu de
   float32 — divise par 2 la RAM et ~2x plus rapide sur matmul.

3. Appel explicite de torch.set_num_threads(N) et set_num_interop_threads(N)
   (OMP_NUM_THREADS seul ne suffit pas). Configurable via TORCH_NUM_THREADS,
   défaut = os.cpu_count().

Mesure sur Ryzen 9 9950X (Zen 5, 16c/32t, AVX-512 BF16 natif) :
- AVANT : 645% CPU (~6.5 cores), 15 Go RAM (float32)
- APRÈS : 2433% CPU (~24 cores), 8 Go RAM (bfloat16)

Appel `torch.cuda.empty_cache()` en fin d'inférence pour réduire la
fragmentation VRAM quand d'autres process GPU tournent en parallèle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 23:07:45 +02:00
Dom
6c8184cc03 feat(deskew): correction automatique du skew au chargement des PDFs
Nouveau module pipeline/deskew.py basé sur cv2.HoughLinesP :
- détecte les lignes quasi-horizontales (±15° de l'horizontale)
- prend la médiane de leurs angles (robuste aux outliers)
- seuils : |angle|>0.3° pour corriger, |angle|>10° = suspect (on ne corrige pas)
- PIL.rotate() avec BICUBIC + fillcolor blanc, sans expand

Intégré dans pipeline/ingest.py (paramètre `deskew=True` par défaut).
L'angle appliqué est tracé dans un fichier `page_XX.skew` à côté de
l'image, pour audit.

Mesuré sur les 18 dossiers de l'échantillon 2018 CARC : seule OGC 1 a
un skew au-dessus du seuil (+0.91°), les 17 autres sont déjà droits.
Le deskew corrige OGC 1 en 0.00° résiduel (vérif visuelle en-tête OK).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 23:07:29 +02:00
6 changed files with 447 additions and 12 deletions

125
pipeline/deskew.py Normal file
View File

@@ -0,0 +1,125 @@
"""Détection d'angle de skew + redressement automatique des pages scannées.
Technique : Hough Transform sur les lignes détectées par Canny, puis moyenne
des angles des lignes « quasi horizontales » (±15° par rapport à l'horizontale).
Les fiches OGC ont énormément de traits de tableau → signal très fort.
Seuil : on ne corrige que si |angle| > `MIN_ANGLE_DEG` (0.3° par défaut) pour
éviter de toucher les scans déjà bien cadrés et introduire du bruit inutile.
"""
from __future__ import annotations
from pathlib import Path
from typing import Tuple
import numpy as np
from PIL import Image
try:
import cv2 # type: ignore
_HAS_CV2 = True
except ImportError:
_HAS_CV2 = False
MIN_ANGLE_DEG = 0.3 # en-dessous, on ne corrige pas
MAX_ANGLE_DEG = 10.0 # au-dessus, c'est anormal → suspect, on ne corrige pas
NEAR_HORIZONTAL_BAND = 15.0 # degrés : bande autour de l'horizontale pour filtrer
def detect_skew_angle(img: Image.Image) -> float:
"""Retourne l'angle de skew en degrés (positif = tourné dans le sens
des aiguilles d'une montre) à appliquer pour redresser l'image.
Si aucune ligne horizontale n'est trouvée, retourne 0.0.
Si l'angle détecté est hors [-MAX_ANGLE_DEG, +MAX_ANGLE_DEG], retourne 0.0
(probablement une erreur de détection, on ne corrige pas).
"""
if not _HAS_CV2:
return 0.0
gray = np.array(img.convert("L"))
# Réduire l'image pour accélérer (max 1500 px de large)
h, w = gray.shape
if w > 1500:
scale = 1500 / w
gray = cv2.resize(gray, (1500, int(h * scale)), interpolation=cv2.INTER_AREA)
# Canny edges — paramètres standards documents
edges = cv2.Canny(gray, 50, 150, apertureSize=3)
# Hough Lines probabiliste : rapide et robuste
lines = cv2.HoughLinesP(
edges, rho=1, theta=np.pi / 180, threshold=200,
minLineLength=gray.shape[1] // 4, # au moins 25% de la largeur
maxLineGap=20,
)
if lines is None or len(lines) == 0:
return 0.0
# Calculer l'angle de chaque ligne en degrés
angles = []
for line in lines:
x1, y1, x2, y2 = line[0]
if x2 == x1:
continue # ligne verticale, ignorée
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
# On ne garde que les lignes proches de l'horizontale
if abs(angle) < NEAR_HORIZONTAL_BAND:
angles.append(angle)
if not angles:
return 0.0
# Moyenne robuste : médiane plutôt que mean, moins sensible aux outliers
angle = float(np.median(angles))
if abs(angle) > MAX_ANGLE_DEG:
return 0.0 # suspect → on ne corrige pas
return angle
def deskew_image(img: Image.Image,
angle: float | None = None,
min_angle: float = MIN_ANGLE_DEG) -> Tuple[Image.Image, float]:
"""Redresse une image si le skew détecté dépasse `min_angle`.
Retourne (image_eventuellement_rotee, angle_applique).
Si |angle| < min_angle, retourne l'image inchangée et angle=0.0.
"""
if angle is None:
angle = detect_skew_angle(img)
if abs(angle) < min_angle:
return img, 0.0
# PIL.Image.rotate : positive angle = counter-clockwise
# detect_skew retourne positif = clockwise → on inverse pour PIL
rotated = img.rotate(
angle,
resample=Image.Resampling.BICUBIC,
expand=False,
fillcolor="white",
)
return rotated, angle
def deskew_file(src: Path, dst: Path | None = None,
min_angle: float = MIN_ANGLE_DEG) -> float:
"""Version fichier → fichier. Écrase `src` si `dst` est None.
Retourne l'angle appliqué (0.0 si pas de rotation)."""
img = Image.open(src)
rotated, angle = deskew_image(img, min_angle=min_angle)
out = dst or src
rotated.save(out, "PNG", optimize=True)
return angle
if __name__ == "__main__":
import sys
import glob
paths = [Path(p) for p in (sys.argv[1:] or sorted(glob.glob(".cache/images/*/page_01.png")))]
print(f"Deskew sur {len(paths)} images (seuil={MIN_ANGLE_DEG}°)...")
total_corrected = 0
for p in paths:
angle = detect_skew_angle(Image.open(p))
mark = "" if abs(angle) >= MIN_ANGLE_DEG else "·"
if abs(angle) >= MIN_ANGLE_DEG:
total_corrected += 1
print(f" {mark} {p} : {angle:+.2f}°")
print(f"\n{total_corrected}/{len(paths)} images auraient besoin d'un redressement.")

View File

@@ -11,7 +11,8 @@ from .prompts import (
PAGE_TYPES, PROMPT_HEADER, PAGE_TYPES, PROMPT_HEADER,
SCHEMA_RECUEIL_RECODAGE, RECUEIL_RECODAGE_ZONE, SCHEMA_RECUEIL_RECODAGE, RECUEIL_RECODAGE_ZONE,
) )
from .checkboxes import detect_accord_desaccord, RECUEIL_ACCORD_DESACCORD, parse_ghs_injustifie from .checkboxes import detect_accord_desaccord, RECUEIL_ACCORD_DESACCORD, parse_ghs_injustifie, CheckboxZones
from .zones_config import load_config, get_zone
from .validation import annotate as validate_annotate from .validation import annotate as validate_annotate
@@ -108,6 +109,23 @@ def parse_json_output(raw: str) -> dict | None:
return {"_raw": raw, "_parse_error": str(e)} return {"_raw": raw, "_parse_error": str(e)}
def _recueil_zones() -> tuple[tuple, CheckboxZones]:
"""Charge les zones configurables pour la page recueil.
Retourne (recodage_zone, accord_desaccord_zones). Si la config n'a pas
d'entrée, on retombe sur les constantes compilées.
"""
cfg = load_config()
reco = get_zone("recueil", "codage_reco", cfg) or RECUEIL_RECODAGE_ZONE
acc = get_zone("recueil", "accord_checkbox", cfg)
des = get_zone("recueil", "desaccord_checkbox", cfg)
if acc and des:
cb = CheckboxZones(accord=acc, desaccord=des)
else:
cb = RECUEIL_ACCORD_DESACCORD
return reco, cb
def _extract_recodage_crop(image_path: Path, ocr: QwenVLOCR) -> dict | None: def _extract_recodage_crop(image_path: Path, ocr: QwenVLOCR) -> dict | None:
"""Second passage VLM sur le crop zonal de la colonne Recodage. """Second passage VLM sur le crop zonal de la colonne Recodage.
@@ -122,7 +140,8 @@ def _extract_recodage_crop(image_path: Path, ocr: QwenVLOCR) -> dict | None:
try: try:
img = Image.open(image_path) img = Image.open(image_path)
w, h = img.size w, h = img.size
x1, y1, x2, y2 = RECUEIL_RECODAGE_ZONE reco_zone, _ = _recueil_zones()
x1, y1, x2, y2 = reco_zone
crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h))) crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
crop_path = image_path.parent / f"{image_path.stem}_recodage.png" crop_path = image_path.parent / f"{image_path.stem}_recodage.png"
crop.save(crop_path) crop.save(crop_path)
@@ -267,7 +286,8 @@ def extract_dossier(pdf_path: str | Path, verbose: bool = True,
# sur la fiche recueil. GLM-OCR / Qwen ne lisent pas les cases # sur la fiche recueil. GLM-OCR / Qwen ne lisent pas les cases
# à cocher (cf. scratch/test_prompt_crop_v2.py). # à cocher (cf. scratch/test_prompt_crop_v2.py).
if ptype == "recueil" and isinstance(parsed, dict): if ptype == "recueil" and isinstance(parsed, dict):
cb = detect_accord_desaccord(img_path, RECUEIL_ACCORD_DESACCORD) _, cb_zones = _recueil_zones()
cb = detect_accord_desaccord(img_path, cb_zones)
parsed["accord_desaccord"] = cb["decision"] parsed["accord_desaccord"] = cb["decision"]
parsed["_checkbox_debug"] = cb # ratios + diff pour audit parsed["_checkbox_debug"] = cb # ratios + diff pour audit
# ghs_injustifie : Qwen renvoie parfois "0 SE 1 2 3 4 ATU FFM FSD" # ghs_injustifie : Qwen renvoie parfois "0 SE 1 2 3 4 ATU FFM FSD"

View File

@@ -1,10 +1,16 @@
"""PDF → images PNG 300 dpi avec cache par hash SHA256.""" """PDF → images PNG 300 dpi avec cache par hash SHA256.
Applique optionnellement un deskew automatique (redressement) sur chaque page
pour corriger le biais d'inclinaison des scans. Voir pipeline/deskew.py.
"""
import hashlib import hashlib
import os import os
from pathlib import Path from pathlib import Path
from pdf2image import convert_from_path from pdf2image import convert_from_path
from PIL import Image from PIL import Image
from .deskew import deskew_image, MIN_ANGLE_DEG
DEFAULT_DPI = 300 DEFAULT_DPI = 300
CACHE_ROOT = Path(".cache/images") CACHE_ROOT = Path(".cache/images")
@@ -18,23 +24,37 @@ def pdf_hash(pdf_path: str) -> str:
return h.hexdigest()[:16] return h.hexdigest()[:16]
def pdf_to_images(pdf_path: str, dpi: int = DEFAULT_DPI, cache_root: Path = CACHE_ROOT) -> list[Path]: def pdf_to_images(pdf_path: str, dpi: int = DEFAULT_DPI,
cache_root: Path = CACHE_ROOT,
deskew: bool = True) -> list[Path]:
"""Convertit un PDF en PNG 300 dpi. Retourne la liste des chemins (1 par page). """Convertit un PDF en PNG 300 dpi. Retourne la liste des chemins (1 par page).
Le cache est indexé par hash du PDF : un PDF inchangé n'est jamais reconverti. Le cache est indexé par hash du PDF : un PDF inchangé n'est jamais reconverti.
Avec `deskew=True` (défaut), chaque page est redressée si son angle de skew
dépasse le seuil défini dans `pipeline.deskew.MIN_ANGLE_DEG` (0.3°). L'angle
appliqué est persisté dans un fichier `<page>.skew` à côté (pour audit).
""" """
cache_root = Path(cache_root) cache_root = Path(cache_root)
h = pdf_hash(pdf_path) h = pdf_hash(pdf_path)
out_dir = cache_root / h out_dir = cache_root / h
out_dir.mkdir(parents=True, exist_ok=True) out_dir.mkdir(parents=True, exist_ok=True)
existing = sorted(out_dir.glob("page_*.png")) # Le glob est strict pour ne pas attraper les crops intermédiaires
# (page_XX_recodage.png, etc.)
existing = sorted(p for p in out_dir.glob("page_*.png")
if p.stem.replace("page_", "").isdigit())
if existing: if existing:
return existing return existing
pages = convert_from_path(pdf_path, dpi) pages = convert_from_path(pdf_path, dpi)
paths = [] paths = []
for i, img in enumerate(pages, 1): for i, img in enumerate(pages, 1):
if deskew:
img, applied = deskew_image(img)
if applied != 0.0:
# Trace d'audit : on note l'angle corrigé
(out_dir / f"page_{i:02d}.skew").write_text(f"{applied:.3f}\n")
p = out_dir / f"page_{i:02d}.png" p = out_dir / f"page_{i:02d}.png"
img.save(p, "PNG", optimize=True) img.save(p, "PNG", optimize=True)
paths.append(p) paths.append(p)

View File

@@ -27,22 +27,64 @@ class QwenVLOCR:
def _init_model(self): def _init_model(self):
t0 = time.time() t0 = time.time()
import os as _os
# max_pixels limite le nombre de patches visuels pour éviter l'OOM # max_pixels limite le nombre de patches visuels pour éviter l'OOM
# sur images 300 dpi (2481x3509). ~800 patches = équilibre qualité/VRAM, # sur images 300 dpi (2481x3509). ~800 patches = équilibre qualité/VRAM,
# tient confortablement dans ~5-6 Go même avec d'autres processus GPU # tient confortablement dans ~5-6 Go même avec d'autres processus GPU
# en arrière-plan. Configurable via env var QWEN_MAX_PIXELS (en patches). # en arrière-plan. Configurable via env var QWEN_MAX_PIXELS (en patches).
import os as _os
max_pixels = int(_os.environ.get("QWEN_MAX_PIXELS", 800)) * 28 * 28 max_pixels = int(_os.environ.get("QWEN_MAX_PIXELS", 800)) * 28 * 28
self.processor = AutoProcessor.from_pretrained( self.processor = AutoProcessor.from_pretrained(
MODEL_PATH, MODEL_PATH,
min_pixels=256 * 28 * 28, min_pixels=256 * 28 * 28,
max_pixels=max_pixels, max_pixels=max_pixels,
) )
self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_PATH, # Device : "auto" par défaut (GPU si dispo), "cpu" pour forcer le CPU
torch_dtype=torch.bfloat16, # quand la VRAM est saturée par d'autres process. Configurable via
device_map="auto", # QWEN_DEVICE=cpu.
) device = _os.environ.get("QWEN_DEVICE", "auto").lower()
if device == "cpu":
# Sur CPU on cherche à maximiser le throughput :
# 1. Utiliser tous les cores via torch.set_num_threads (set_num_threads
# prime sur OMP_NUM_THREADS pour les ops PyTorch natifs).
# 2. Choisir bfloat16 si le CPU le supporte nativement (Zen 5,
# Zen 4, Intel Sapphire Rapids+ ont AVX-512 BF16). Sinon float32.
n_threads = int(_os.environ.get("TORCH_NUM_THREADS", _os.cpu_count() or 8))
torch.set_num_threads(n_threads)
try:
torch.set_num_interop_threads(n_threads)
except RuntimeError:
pass # déjà initialisé, ignorer
# Détection AVX-512 BF16 via /proc/cpuinfo (Linux)
use_bf16 = False
try:
with open("/proc/cpuinfo") as f:
flags = f.read()
use_bf16 = "avx512_bf16" in flags or "amx_bf16" in flags
except Exception:
pass
dtype = torch.bfloat16 if use_bf16 else torch.float32
self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_PATH,
torch_dtype=dtype,
device_map={"": "cpu"},
low_cpu_mem_usage=True,
)
self.device_used = "cpu"
self.cpu_threads = n_threads
self.cpu_dtype = str(dtype).replace("torch.", "")
else:
self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_PATH,
torch_dtype=torch.bfloat16,
device_map="auto",
)
self.device_used = "cuda" if torch.cuda.is_available() else "cpu"
self.cpu_threads = None
self.cpu_dtype = None
self.model.eval() self.model.eval()
self.load_time = time.time() - t0 self.load_time = time.time() - t0
self.vram_gb = torch.cuda.memory_allocated() / 1e9 if torch.cuda.is_available() else 0.0 self.vram_gb = torch.cuda.memory_allocated() / 1e9 if torch.cuda.is_available() else 0.0

View File

@@ -28,6 +28,13 @@ import streamlit as st
from PIL import Image from PIL import Image
from pipeline.ingest import pdf_to_images from pipeline.ingest import pdf_to_images
from pipeline.zones_config import load_config, save_config, DEFAULT_CONFIG_PATH
try:
from streamlit_drawable_canvas import st_canvas
_HAS_CANVAS = True
except ImportError:
_HAS_CANVAS = False
# ============================================================ # ============================================================
@@ -268,6 +275,130 @@ def render_page_editor(name: str, ptype: str, extract: dict, gold: dict | None):
st.code(page_meta.get("ocr_raw", ""), language="json") st.code(page_meta.get("ocr_raw", ""), language="json")
def render_calibration_page():
"""Mode 'Calibration zones' : dessine des rectangles à la souris sur une
image de référence, sauvegarde dans pipeline/zones_config.json."""
st.header("🔧 Calibration des zones")
if not _HAS_CANVAS:
st.error(
"Le package `streamlit-drawable-canvas` n'est pas installé.\n"
"Installe-le avec : `pip install streamlit-drawable-canvas`"
)
return
pdfs = list_pdfs()
if not pdfs:
st.error("Aucun PDF disponible pour la calibration")
return
col_ctrl, _ = st.columns([1, 3])
with col_ctrl:
ref_name = st.selectbox(
"PDF de référence (bien cadré)",
[p.stem for p in pdfs], key="calib_pdf",
)
page_type = st.selectbox(
"Type de page", ["recueil"],
help="Aujourd'hui seule la page recueil a des zones configurables",
)
# Page numéro selon le type (recueil = page 1)
page_num = {"recueil": 1}.get(page_type, 1)
ref_pdf = next(p for p in pdfs if p.stem == ref_name)
img_path = pdf_to_images(str(ref_pdf))[page_num - 1]
img = Image.open(img_path)
img_w, img_h = img.size
# Charger config existante et préparer les zones
cfg = load_config()
existing_zones = cfg.get(page_type, {})
# On scale l'image pour tenir dans le canvas (largeur ~900 px max)
canvas_w = 900
scale = canvas_w / img_w
canvas_h = int(img_h * scale)
# Préparer les rectangles initiaux depuis la config
initial_rects = []
for zone_name, z in existing_zones.items():
if not isinstance(z, dict): continue
initial_rects.append({
"type": "rect",
"left": z["x1"] * canvas_w,
"top": z["y1"] * canvas_h,
"width": (z["x2"] - z["x1"]) * canvas_w,
"height": (z["y2"] - z["y1"]) * canvas_h,
"fill": "rgba(255, 100, 100, 0.15)",
"stroke": "red",
"strokeWidth": 2,
"label_name": zone_name,
})
st.caption(
"💡 Dessine un rectangle par zone à la souris. Les zones existantes "
"apparaissent déjà pré-dessinées. Tu peux les modifier (drag), "
"en ajouter, ou en supprimer (touche Suppr) puis cliquer sur "
"**Sauvegarder**."
)
drawing_mode = st.radio(
"Mode", ["rect", "transform"], horizontal=True,
format_func=lambda x: {"rect": "✏️ Dessiner", "transform": "🖱 Sélectionner / Déplacer"}[x],
key="calib_drawing_mode",
)
canvas_result = st_canvas(
fill_color="rgba(255, 100, 100, 0.15)",
stroke_width=2,
stroke_color="red",
background_image=img,
update_streamlit=True,
width=canvas_w,
height=canvas_h,
drawing_mode=drawing_mode,
initial_drawing={"objects": initial_rects, "version": "5.2.1"},
key="calib_canvas",
)
# Reconstituer la config à partir des rectangles dessinés
rects = (canvas_result.json_data or {}).get("objects", []) if canvas_result.json_data else []
st.markdown("### Zones détectées")
if not rects:
st.info("Aucun rectangle dessiné.")
return
new_zones = {}
for i, r in enumerate(rects):
if r.get("type") != "rect":
continue
# Récupérer le nom existant si présent, sinon demander
default_name = r.get("label_name") or f"zone_{i+1}"
name = st.text_input(
f"Nom de la zone {i+1}",
value=default_name, key=f"calib_name_{i}",
)
x1 = r["left"] / canvas_w
y1 = r["top"] / canvas_h
x2 = x1 + r["width"] / canvas_w
y2 = y1 + r["height"] / canvas_h
desc = existing_zones.get(name, {}).get("description", "")
desc = st.text_input(
f"Description (optionnel)", value=desc, key=f"calib_desc_{i}",
)
st.caption(f"Coords relatives : ({x1:.3f}, {y1:.3f}) → ({x2:.3f}, {y2:.3f})")
new_zones[name] = {"x1": round(x1, 4), "y1": round(y1, 4),
"x2": round(x2, 4), "y2": round(y2, 4),
"description": desc}
if st.button("💾 Sauvegarder la configuration", type="primary"):
cfg[page_type] = new_zones
path = save_config(cfg)
st.success(f"Configuration sauvegardée : {path}")
st.json(new_zones)
def main(): def main():
st.set_page_config(page_title="OGC Overlay", layout="wide") st.set_page_config(page_title="OGC Overlay", layout="wide")
@@ -280,6 +411,13 @@ def main():
st.title("🩺 Extraction OGC — review & gold set") st.title("🩺 Extraction OGC — review & gold set")
# Sélecteur de mode en haut de sidebar
with st.sidebar:
mode = st.radio("Mode", ["📋 Review dossier", "🔧 Calibration zones"])
if mode == "🔧 Calibration zones":
render_calibration_page()
return
pdfs = list_pdfs() pdfs = list_pdfs()
if not pdfs: if not pdfs:
st.error(f"Aucun PDF trouvé dans {PDF_DIR}") st.error(f"Aucun PDF trouvé dans {PDF_DIR}")

90
pipeline/zones_config.py Normal file
View File

@@ -0,0 +1,90 @@
"""Configuration des zones d'extraction éditable via l'overlay UI.
Les coordonnées sont relatives (0..1) dans l'image source. Elles sont chargées
au démarrage du pipeline et utilisées à la place des constantes en dur dans
`pipeline/prompts.py` et `pipeline/checkboxes.py` — avec fallback sur ces
constantes si la config n'est pas présente, pour ne pas casser l'existant.
Structure :
{
"recueil": {
"codage_reco": {"x1":0.77, "y1":0.330, "x2":0.97, "y2":0.490, "description":"..."},
"accord_checkbox": {"x1":..., "y1":..., "x2":..., "y2":..., "description":"..."},
"desaccord_checkbox":{...}
},
"concertation_2": {...}
}
Un fichier unique `zones_config.json` à la racine du projet, ou au chemin pointé
par la variable d'env `OGC_ZONES_CONFIG`.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
DEFAULT_CONFIG_PATH = Path(
os.environ.get("OGC_ZONES_CONFIG", "zones_config.json")
)
# Zones par défaut, identiques aux constantes actuelles dans prompts.py et
# checkboxes.py. Sert de fallback et de "mise à jour initiale" quand le
# fichier n'existe pas encore.
DEFAULTS: dict = {
"recueil": {
"codage_reco": {
"x1": 0.77, "y1": 0.330, "x2": 0.97, "y2": 0.490,
"description": "Colonne Recodage (DP / DR / DAS) — exclut le bloc Actes",
},
"accord_checkbox": {
"x1": 0.588, "y1": 0.838, "x2": 0.622, "y2": 0.860,
"description": "Case à cocher 'Accord'",
},
"desaccord_checkbox": {
"x1": 0.588, "y1": 0.858, "x2": 0.622, "y2": 0.880,
"description": "Case à cocher 'Désaccord'",
},
},
}
def load_config(path: Path = DEFAULT_CONFIG_PATH) -> dict:
"""Charge la config JSON, ou retourne les defaults si absente."""
if not path.exists():
return _deep_copy(DEFAULTS)
try:
raw = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return _deep_copy(DEFAULTS)
# Merge : les defaults sont une base, la config utilisateur vient par-dessus
merged = _deep_copy(DEFAULTS)
for page, zones in raw.items():
merged.setdefault(page, {}).update(zones)
return merged
def save_config(cfg: dict, path: Path = DEFAULT_CONFIG_PATH) -> Path:
path.write_text(json.dumps(cfg, ensure_ascii=False, indent=2), encoding="utf-8")
return path
def get_zone(page_type: str, zone_name: str,
config: dict | None = None) -> tuple[float, float, float, float] | None:
"""Récupère une zone depuis la config ou les defaults.
Retourne (x1, y1, x2, y2) ou None si inconnue.
"""
cfg = config or load_config()
z = cfg.get(page_type, {}).get(zone_name)
if not isinstance(z, dict):
return None
try:
return (float(z["x1"]), float(z["y1"]), float(z["x2"]), float(z["y2"]))
except (KeyError, ValueError, TypeError):
return None
def _deep_copy(d: dict) -> dict:
return json.loads(json.dumps(d))