Compare commits
6 Commits
b47f5c47e0
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e55daf275e | ||
|
|
3a87751444 | ||
|
|
d326524e49 | ||
|
|
1255468676 | ||
|
|
c0b0cd9b87 | ||
|
|
6c8184cc03 |
125
pipeline/deskew.py
Normal file
125
pipeline/deskew.py
Normal file
@@ -0,0 +1,125 @@
|
||||
"""Détection d'angle de skew + redressement automatique des pages scannées.
|
||||
|
||||
Technique : Hough Transform sur les lignes détectées par Canny, puis moyenne
|
||||
des angles des lignes « quasi horizontales » (±15° par rapport à l'horizontale).
|
||||
Les fiches OGC ont énormément de traits de tableau → signal très fort.
|
||||
|
||||
Seuil : on ne corrige que si |angle| > `MIN_ANGLE_DEG` (0.3° par défaut) pour
|
||||
éviter de toucher les scans déjà bien cadrés et introduire du bruit inutile.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Tuple
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
try:
|
||||
import cv2 # type: ignore
|
||||
_HAS_CV2 = True
|
||||
except ImportError:
|
||||
_HAS_CV2 = False
|
||||
|
||||
|
||||
MIN_ANGLE_DEG = 0.3 # en-dessous, on ne corrige pas
|
||||
MAX_ANGLE_DEG = 10.0 # au-dessus, c'est anormal → suspect, on ne corrige pas
|
||||
NEAR_HORIZONTAL_BAND = 15.0 # degrés : bande autour de l'horizontale pour filtrer
|
||||
|
||||
|
||||
def detect_skew_angle(img: Image.Image) -> float:
|
||||
"""Retourne l'angle de skew en degrés (positif = tourné dans le sens
|
||||
des aiguilles d'une montre) à appliquer pour redresser l'image.
|
||||
|
||||
Si aucune ligne horizontale n'est trouvée, retourne 0.0.
|
||||
Si l'angle détecté est hors [-MAX_ANGLE_DEG, +MAX_ANGLE_DEG], retourne 0.0
|
||||
(probablement une erreur de détection, on ne corrige pas).
|
||||
"""
|
||||
if not _HAS_CV2:
|
||||
return 0.0
|
||||
gray = np.array(img.convert("L"))
|
||||
# Réduire l'image pour accélérer (max 1500 px de large)
|
||||
h, w = gray.shape
|
||||
if w > 1500:
|
||||
scale = 1500 / w
|
||||
gray = cv2.resize(gray, (1500, int(h * scale)), interpolation=cv2.INTER_AREA)
|
||||
|
||||
# Canny edges — paramètres standards documents
|
||||
edges = cv2.Canny(gray, 50, 150, apertureSize=3)
|
||||
# Hough Lines probabiliste : rapide et robuste
|
||||
lines = cv2.HoughLinesP(
|
||||
edges, rho=1, theta=np.pi / 180, threshold=200,
|
||||
minLineLength=gray.shape[1] // 4, # au moins 25% de la largeur
|
||||
maxLineGap=20,
|
||||
)
|
||||
if lines is None or len(lines) == 0:
|
||||
return 0.0
|
||||
|
||||
# Calculer l'angle de chaque ligne en degrés
|
||||
angles = []
|
||||
for line in lines:
|
||||
x1, y1, x2, y2 = line[0]
|
||||
if x2 == x1:
|
||||
continue # ligne verticale, ignorée
|
||||
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
|
||||
# On ne garde que les lignes proches de l'horizontale
|
||||
if abs(angle) < NEAR_HORIZONTAL_BAND:
|
||||
angles.append(angle)
|
||||
|
||||
if not angles:
|
||||
return 0.0
|
||||
|
||||
# Moyenne robuste : médiane plutôt que mean, moins sensible aux outliers
|
||||
angle = float(np.median(angles))
|
||||
if abs(angle) > MAX_ANGLE_DEG:
|
||||
return 0.0 # suspect → on ne corrige pas
|
||||
return angle
|
||||
|
||||
|
||||
def deskew_image(img: Image.Image,
|
||||
angle: float | None = None,
|
||||
min_angle: float = MIN_ANGLE_DEG) -> Tuple[Image.Image, float]:
|
||||
"""Redresse une image si le skew détecté dépasse `min_angle`.
|
||||
|
||||
Retourne (image_eventuellement_rotee, angle_applique).
|
||||
Si |angle| < min_angle, retourne l'image inchangée et angle=0.0.
|
||||
"""
|
||||
if angle is None:
|
||||
angle = detect_skew_angle(img)
|
||||
if abs(angle) < min_angle:
|
||||
return img, 0.0
|
||||
# PIL.Image.rotate : positive angle = counter-clockwise
|
||||
# detect_skew retourne positif = clockwise → on inverse pour PIL
|
||||
rotated = img.rotate(
|
||||
angle,
|
||||
resample=Image.Resampling.BICUBIC,
|
||||
expand=False,
|
||||
fillcolor="white",
|
||||
)
|
||||
return rotated, angle
|
||||
|
||||
|
||||
def deskew_file(src: Path, dst: Path | None = None,
|
||||
min_angle: float = MIN_ANGLE_DEG) -> float:
|
||||
"""Version fichier → fichier. Écrase `src` si `dst` est None.
|
||||
Retourne l'angle appliqué (0.0 si pas de rotation)."""
|
||||
img = Image.open(src)
|
||||
rotated, angle = deskew_image(img, min_angle=min_angle)
|
||||
out = dst or src
|
||||
rotated.save(out, "PNG", optimize=True)
|
||||
return angle
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
import glob
|
||||
paths = [Path(p) for p in (sys.argv[1:] or sorted(glob.glob(".cache/images/*/page_01.png")))]
|
||||
print(f"Deskew sur {len(paths)} images (seuil={MIN_ANGLE_DEG}°)...")
|
||||
total_corrected = 0
|
||||
for p in paths:
|
||||
angle = detect_skew_angle(Image.open(p))
|
||||
mark = "→" if abs(angle) >= MIN_ANGLE_DEG else "·"
|
||||
if abs(angle) >= MIN_ANGLE_DEG:
|
||||
total_corrected += 1
|
||||
print(f" {mark} {p} : {angle:+.2f}°")
|
||||
print(f"\n{total_corrected}/{len(paths)} images auraient besoin d'un redressement.")
|
||||
@@ -1,211 +1,84 @@
|
||||
"""Orchestration d'extraction pour un dossier OGC."""
|
||||
import json
|
||||
import re
|
||||
"""Orchestration d'extraction pour un dossier OGC.
|
||||
|
||||
Chaîne les étages du pipeline sans connaître leur implémentation interne :
|
||||
|
||||
ingest → routing → OCR page par page → enrichissement page-spécifique → validation ATIH
|
||||
|
||||
L'orchestration elle-même ne contient aucune logique métier : elle délègue à
|
||||
`pipeline.recueil`, `pipeline.validation`, `pipeline.classify`, `pipeline.ocr_qwen`
|
||||
et `pipeline.prompts`. Cela permet de tester indépendamment chaque étage.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from pathlib import Path
|
||||
from PIL import Image
|
||||
from .ingest import pdf_to_images
|
||||
|
||||
from .classify import detect_page_type, route_by_index
|
||||
from .ingest import pdf_to_images
|
||||
from .json_utils import parse_json_output
|
||||
from .ocr_qwen import QwenVLOCR
|
||||
from .prompts import (
|
||||
PAGE_TYPES, PROMPT_HEADER,
|
||||
SCHEMA_RECUEIL_RECODAGE, RECUEIL_RECODAGE_ZONE,
|
||||
)
|
||||
from .checkboxes import detect_accord_desaccord, RECUEIL_ACCORD_DESACCORD, parse_ghs_injustifie
|
||||
from .prompts import PAGE_TYPES, PROMPT_HEADER
|
||||
from .recueil import enrich_recueil, resolve_recueil_zones
|
||||
from .validation import annotate as validate_annotate
|
||||
|
||||
|
||||
_EMPTY_OBJ_PATTERN = re.compile(
|
||||
r'\{\s*"code"\s*:\s*""\s*,\s*"position"\s*:\s*""\s*(?:,\s*"libelle"\s*:\s*""\s*)?\}',
|
||||
re.DOTALL,
|
||||
)
|
||||
def _run_page_ocr(ocr: QwenVLOCR, image_path: Path, ptype: str) -> tuple[dict | None, str, float]:
|
||||
"""Exécute le prompt principal associé à un type de page et parse le JSON.
|
||||
|
||||
|
||||
def _truncate_empty_loop(text: str, max_consecutive: int = 2) -> str:
|
||||
"""Détecte et tronque les boucles d'objets vides.
|
||||
|
||||
GLM-OCR peut boucler sur `{"code":"", "position":"", "libelle":""}` quand
|
||||
un tableau DAS ou actes est vide dans l'image. La sortie est alors
|
||||
tronquée à `max_new_tokens` sans fermer le JSON → parse error.
|
||||
On garde au plus `max_consecutive` objets vides puis on coupe.
|
||||
Retourne (parsed_dict_ou_None, ocr_raw, elapsed_s). `parsed=None` quand
|
||||
la page n'a pas de prompt structuré associé (concertation_med, hospit.).
|
||||
"""
|
||||
matches = list(_EMPTY_OBJ_PATTERN.finditer(text))
|
||||
if len(matches) <= max_consecutive:
|
||||
return text
|
||||
# On coupe après la fin du `max_consecutive`-ième match
|
||||
cut_at = matches[max_consecutive - 1].end()
|
||||
return text[:cut_at]
|
||||
|
||||
|
||||
def _close_open_json(text: str) -> str:
|
||||
"""Ajoute les brackets/braces manquants pour tenter de fermer un JSON tronqué."""
|
||||
# Compte les brackets non balancés en ignorant ceux entre guillemets simples/doubles
|
||||
depth_brace = 0
|
||||
depth_bracket = 0
|
||||
in_string = False
|
||||
escape = False
|
||||
for c in text:
|
||||
if escape:
|
||||
escape = False
|
||||
continue
|
||||
if c == "\\":
|
||||
escape = True
|
||||
continue
|
||||
if c == '"':
|
||||
in_string = not in_string
|
||||
continue
|
||||
if in_string:
|
||||
continue
|
||||
if c == "{": depth_brace += 1
|
||||
elif c == "}": depth_brace -= 1
|
||||
elif c == "[": depth_bracket += 1
|
||||
elif c == "]": depth_bracket -= 1
|
||||
# Retirer les virgules traînantes
|
||||
closed = text.rstrip().rstrip(",")
|
||||
# Fermer en priorité les crochets ouverts (tableaux), puis les accolades
|
||||
closed += "]" * max(0, depth_bracket)
|
||||
closed += "}" * max(0, depth_brace)
|
||||
return closed
|
||||
|
||||
|
||||
def parse_json_output(raw: str) -> dict | None:
|
||||
"""Tente d'extraire un JSON depuis la sortie GLM-OCR.
|
||||
|
||||
Stratégies successives :
|
||||
1. parse direct après retrait des fences ```json
|
||||
2. patch des virgules manquantes entre objets / tableaux
|
||||
3. détection et troncature des boucles d'objets vides (cas fréquent sur
|
||||
tableaux DAS/actes vides → boucle jusqu'à max_new_tokens)
|
||||
4. fermeture des structures JSON ouvertes après troncature
|
||||
"""
|
||||
if not raw:
|
||||
return None
|
||||
text = raw.strip()
|
||||
# 1) fences markdown
|
||||
text = re.sub(r"^```(?:json)?\s*", "", text)
|
||||
text = re.sub(r"\s*```$", "", text)
|
||||
try:
|
||||
return json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# 2) virgules manquantes entre `} {` et `] [`
|
||||
patched = re.sub(r"\}\s*\n(\s*\{)", r"},\n\1", text)
|
||||
patched = re.sub(r"\]\s*\n(\s*\[)", r"],\n\1", patched)
|
||||
try:
|
||||
return json.loads(patched)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# 3) troncature des boucles d'objets vides puis 4) fermeture
|
||||
trimmed = _truncate_empty_loop(patched)
|
||||
closed = _close_open_json(trimmed)
|
||||
try:
|
||||
result = json.loads(closed)
|
||||
result["_truncated_loop"] = True # trace de l'intervention
|
||||
return result
|
||||
except json.JSONDecodeError as e:
|
||||
return {"_raw": raw, "_parse_error": str(e)}
|
||||
|
||||
|
||||
def _extract_recodage_crop(image_path: Path, ocr: QwenVLOCR) -> dict | None:
|
||||
"""Second passage VLM sur le crop zonal de la colonne Recodage.
|
||||
|
||||
Qwen nous renvoie la liste brute de tous les codes visibles (avec position
|
||||
si présente). On classifie DP/DR/DAS en Python par règles :
|
||||
- les 1ᵉʳ et 2ᵉ codes SANS position → DP puis DR (DR peut être vide si
|
||||
le 2ᵉ code a déjà une position).
|
||||
- tous les codes AVEC position → DAS.
|
||||
|
||||
Retourne un dict {dp, dr, das[]} ou None en cas d'échec.
|
||||
"""
|
||||
try:
|
||||
img = Image.open(image_path)
|
||||
w, h = img.size
|
||||
x1, y1, x2, y2 = RECUEIL_RECODAGE_ZONE
|
||||
crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
|
||||
crop_path = image_path.parent / f"{image_path.stem}_recodage.png"
|
||||
crop.save(crop_path)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
res = ocr.run(crop_path, SCHEMA_RECUEIL_RECODAGE, max_new_tokens=1024)
|
||||
conf = PAGE_TYPES.get(ptype)
|
||||
if not conf or conf["prompt"] == PROMPT_HEADER:
|
||||
return None, "", 0.0
|
||||
res = ocr.run(image_path, conf["prompt"], max_new_tokens=4096)
|
||||
parsed = parse_json_output(res["text"])
|
||||
if not isinstance(parsed, dict) or "_parse_error" in parsed:
|
||||
return None
|
||||
|
||||
# Filtrer : ne garder que les codes au format CIM-10. Si le crop dépasse
|
||||
# malgré tout dans la zone Actes, les CCAM (4 lettres + 3 chiffres) seront
|
||||
# exclus ici.
|
||||
cim10_re = re.compile(r"^[A-Z]\d{2,4}\s*\*?\s*\+?\d*$")
|
||||
codes_raw = parsed.get("codes") or []
|
||||
codes = []
|
||||
for c in codes_raw:
|
||||
if not isinstance(c, dict): continue
|
||||
code = (c.get("code") or "").strip()
|
||||
if code and cim10_re.match(code):
|
||||
codes.append({
|
||||
"code": code,
|
||||
"position": str(c.get("position") or "").strip(),
|
||||
})
|
||||
|
||||
# Classifier par règle métier :
|
||||
# - 1er code sans position → DP
|
||||
# - 2e code sans position → DR (sauf s'il est identique au DP : Qwen tend
|
||||
# à dupliquer le DP quand DR est vide — on préfère DR="")
|
||||
# - codes avec position → DAS
|
||||
dp, dr = "", ""
|
||||
das = []
|
||||
dp_assigned = dr_assigned = False
|
||||
for c in codes:
|
||||
code, position = c["code"], c["position"]
|
||||
if not position:
|
||||
if not dp_assigned:
|
||||
dp, dp_assigned = code, True
|
||||
elif not dr_assigned:
|
||||
if code == dp:
|
||||
# doublon du DP → on considère que DR est vide
|
||||
dr_assigned = True
|
||||
else:
|
||||
dr, dr_assigned = code, True
|
||||
else:
|
||||
das.append({"code": code, "position": ""})
|
||||
else:
|
||||
das.append(c)
|
||||
return {
|
||||
"dp": dp, "dr": dr, "das": das,
|
||||
"_source": "crop_recodage",
|
||||
"_elapsed_s": round(res["elapsed_s"], 2),
|
||||
"_n_codes_raw": len(codes_raw),
|
||||
"_n_codes_kept": len(codes),
|
||||
}
|
||||
return parsed, res["text"], round(res["elapsed_s"], 2)
|
||||
|
||||
|
||||
def _merge_codage_reco(parsed: dict, reco: dict) -> None:
|
||||
"""Fusionne le résultat du crop Recodage dans parsed["codage_reco"].
|
||||
def _resolve_routing(images: list[Path], ocr: QwenVLOCR,
|
||||
use_standard_routing: bool,
|
||||
verbose: bool) -> tuple[list[str | None], list[str]]:
|
||||
"""Détermine le type de chaque page, soit par ordre standard (une seule
|
||||
vérification sur la page 1), soit par classification OCR page par page.
|
||||
|
||||
Politique : le crop est plus fiable (contexte isolé), il prime sur le
|
||||
passage principal SAUF si le crop laisse vide un champ que le principal
|
||||
avait bien lu.
|
||||
Retourne (page_types, headers). `headers[i]` est vide si pas de classify
|
||||
effectuée sur la page i.
|
||||
"""
|
||||
existing = parsed.get("codage_reco") if isinstance(parsed.get("codage_reco"), dict) else {}
|
||||
merged = {
|
||||
"dp": reco.get("dp", "") or existing.get("dp", ""),
|
||||
"dr": reco.get("dr", "") or existing.get("dr", ""),
|
||||
"das": reco.get("das") or existing.get("das") or [],
|
||||
}
|
||||
parsed["codage_reco"] = merged
|
||||
parsed.setdefault("_crop_recodage", {})["result"] = reco
|
||||
page_types: list[str | None] = [None] * len(images)
|
||||
headers: list[str] = [""] * len(images)
|
||||
|
||||
if use_standard_routing and images:
|
||||
ptype1, header1 = detect_page_type(images[0], ocr)
|
||||
if ptype1 == "recueil":
|
||||
page_types = list(route_by_index(len(images)))
|
||||
headers[0] = header1
|
||||
if verbose:
|
||||
print(" routing standard (page 1 = recueil OK)")
|
||||
return page_types, headers
|
||||
if verbose:
|
||||
print(f" page 1 = {ptype1} → fallback classification")
|
||||
|
||||
# Fallback : classify page par page
|
||||
for i, img in enumerate(images):
|
||||
page_types[i], headers[i] = detect_page_type(img, ocr)
|
||||
return page_types, headers
|
||||
|
||||
|
||||
def extract_dossier(pdf_path: str | Path, verbose: bool = True,
|
||||
use_standard_routing: bool = True) -> dict:
|
||||
"""Pipeline complet d'un dossier : PDF → JSON structuré.
|
||||
"""Pipeline complet d'un dossier : PDF → JSON structuré + annoté ATIH.
|
||||
|
||||
use_standard_routing=True (défaut) : route les pages par index selon
|
||||
l'ordre standard OGC (6 pages), sans OCR de classification. -50% du temps.
|
||||
Vérifie uniquement la page 1 pour s'assurer qu'on commence bien par
|
||||
"recueil" — si non, bascule en classification complète (fallback).
|
||||
Étages :
|
||||
1. `ingest.pdf_to_images` : PDF → PNG 300 dpi (avec deskew auto, cache)
|
||||
2. `_resolve_routing` : type de chaque page
|
||||
3. `_run_page_ocr` : OCR du schéma structuré par type de page
|
||||
4. `recueil.enrich_recueil` : checkboxes + crop Recodage pour la page recueil
|
||||
5. `validation.annotate` : validation ATIH de tous les codes extraits
|
||||
|
||||
Paramètre `use_standard_routing=True` exploite l'ordre standard des 6 pages
|
||||
OGC et économise 5 appels OCR par dossier. Bascule automatique sur la
|
||||
classification page-à-page si la page 1 n'est pas le recueil attendu.
|
||||
"""
|
||||
pdf_path = Path(pdf_path)
|
||||
ocr = QwenVLOCR()
|
||||
@@ -216,37 +89,22 @@ def extract_dossier(pdf_path: str | Path, verbose: bool = True,
|
||||
if verbose:
|
||||
print(f"[{pdf_path.name}] {len(images)} pages converties")
|
||||
|
||||
# Choix de stratégie de routing
|
||||
page_types = [None] * len(images)
|
||||
headers = [""] * len(images)
|
||||
if use_standard_routing:
|
||||
# Vérif rapide sur la page 1 (seul OCR de classification)
|
||||
ptype1, header1 = detect_page_type(images[0], ocr)
|
||||
if ptype1 == "recueil":
|
||||
page_types = route_by_index(len(images))
|
||||
headers[0] = header1
|
||||
if verbose:
|
||||
print(f" routing standard (page 1 = recueil OK)")
|
||||
else:
|
||||
if verbose:
|
||||
print(f" page 1 = {ptype1} → fallback classification")
|
||||
use_standard_routing = False
|
||||
page_types, headers = _resolve_routing(images, ocr, use_standard_routing, verbose)
|
||||
|
||||
result = {
|
||||
_, cb_zones = resolve_recueil_zones()
|
||||
|
||||
result: dict = {
|
||||
"fichier": pdf_path.stem,
|
||||
"pdf_hash": images[0].parent.name,
|
||||
"pdf_hash": images[0].parent.name if images else "",
|
||||
"pages": [],
|
||||
"extraction": {},
|
||||
}
|
||||
|
||||
for idx, img_path in enumerate(images, 1):
|
||||
t0 = time.time()
|
||||
if use_standard_routing:
|
||||
ptype = page_types[idx - 1]
|
||||
header_text = headers[idx - 1]
|
||||
else:
|
||||
ptype, header_text = detect_page_type(img_path, ocr)
|
||||
page_info = {
|
||||
page_info: dict = {
|
||||
"page": idx,
|
||||
"type": ptype,
|
||||
"header": header_text.strip(),
|
||||
@@ -255,42 +113,20 @@ def extract_dossier(pdf_path: str | Path, verbose: bool = True,
|
||||
if verbose:
|
||||
print(f" p{idx}: {ptype}")
|
||||
|
||||
prompt_conf = PAGE_TYPES.get(ptype)
|
||||
if prompt_conf and prompt_conf["prompt"] != PROMPT_HEADER:
|
||||
res = ocr.run(img_path, prompt_conf["prompt"], max_new_tokens=4096)
|
||||
parsed = parse_json_output(res["text"])
|
||||
page_info["ocr_raw"] = res["text"]
|
||||
parsed, ocr_raw, elapsed = _run_page_ocr(ocr, img_path, ptype) if ptype else (None, "", 0.0)
|
||||
if parsed is not None:
|
||||
page_info["ocr_raw"] = ocr_raw
|
||||
page_info["parsed"] = parsed
|
||||
page_info["elapsed_s"] = round(res["elapsed_s"], 2)
|
||||
page_info["elapsed_s"] = elapsed
|
||||
|
||||
# Enrichissement : checkboxes + normalisation champs booléens
|
||||
# sur la fiche recueil. GLM-OCR / Qwen ne lisent pas les cases
|
||||
# à cocher (cf. scratch/test_prompt_crop_v2.py).
|
||||
if ptype == "recueil" and isinstance(parsed, dict):
|
||||
cb = detect_accord_desaccord(img_path, RECUEIL_ACCORD_DESACCORD)
|
||||
parsed["accord_desaccord"] = cb["decision"]
|
||||
parsed["_checkbox_debug"] = cb # ratios + diff pour audit
|
||||
# ghs_injustifie : Qwen renvoie parfois "0 SE 1 2 3 4 ATU FFM FSD"
|
||||
# → ne garder que le chiffre 0/1 de tête
|
||||
parsed["ghs_injustifie"] = parse_ghs_injustifie(parsed.get("ghs_injustifie", ""))
|
||||
|
||||
# Second passage : crop de la colonne Recodage pour compenser
|
||||
# la sous-extraction observée sur codage_reco.* en passage principal.
|
||||
reco = _extract_recodage_crop(img_path, ocr)
|
||||
if reco:
|
||||
_merge_codage_reco(parsed, reco)
|
||||
|
||||
enrich_recueil(parsed, img_path, ocr, cb_zones)
|
||||
page_info["parsed"] = parsed
|
||||
|
||||
# Indexer par type pour accès direct dans result["extraction"]
|
||||
result["extraction"][ptype] = parsed
|
||||
else:
|
||||
# Pages non structurées : juste l'en-tête déjà OCR
|
||||
page_info["elapsed_s"] = round(time.time() - t0, 2)
|
||||
|
||||
result["pages"].append(page_info)
|
||||
|
||||
# Post-traitement : validation ATIH de tous les codes extraits
|
||||
result = validate_annotate(result)
|
||||
|
||||
return result
|
||||
return validate_annotate(result)
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
"""PDF → images PNG 300 dpi avec cache par hash SHA256."""
|
||||
"""PDF → images PNG 300 dpi avec cache par hash SHA256.
|
||||
|
||||
Applique optionnellement un deskew automatique (redressement) sur chaque page
|
||||
pour corriger le biais d'inclinaison des scans. Voir pipeline/deskew.py.
|
||||
"""
|
||||
import hashlib
|
||||
import os
|
||||
from pathlib import Path
|
||||
from pdf2image import convert_from_path
|
||||
from PIL import Image
|
||||
|
||||
from .deskew import deskew_image, MIN_ANGLE_DEG
|
||||
|
||||
DEFAULT_DPI = 300
|
||||
CACHE_ROOT = Path(".cache/images")
|
||||
|
||||
@@ -18,23 +24,37 @@ def pdf_hash(pdf_path: str) -> str:
|
||||
return h.hexdigest()[:16]
|
||||
|
||||
|
||||
def pdf_to_images(pdf_path: str, dpi: int = DEFAULT_DPI, cache_root: Path = CACHE_ROOT) -> list[Path]:
|
||||
def pdf_to_images(pdf_path: str, dpi: int = DEFAULT_DPI,
|
||||
cache_root: Path = CACHE_ROOT,
|
||||
deskew: bool = True) -> list[Path]:
|
||||
"""Convertit un PDF en PNG 300 dpi. Retourne la liste des chemins (1 par page).
|
||||
|
||||
Le cache est indexé par hash du PDF : un PDF inchangé n'est jamais reconverti.
|
||||
|
||||
Avec `deskew=True` (défaut), chaque page est redressée si son angle de skew
|
||||
dépasse le seuil défini dans `pipeline.deskew.MIN_ANGLE_DEG` (0.3°). L'angle
|
||||
appliqué est persisté dans un fichier `<page>.skew` à côté (pour audit).
|
||||
"""
|
||||
cache_root = Path(cache_root)
|
||||
h = pdf_hash(pdf_path)
|
||||
out_dir = cache_root / h
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
existing = sorted(out_dir.glob("page_*.png"))
|
||||
# Le glob est strict pour ne pas attraper les crops intermédiaires
|
||||
# (page_XX_recodage.png, etc.)
|
||||
existing = sorted(p for p in out_dir.glob("page_*.png")
|
||||
if p.stem.replace("page_", "").isdigit())
|
||||
if existing:
|
||||
return existing
|
||||
|
||||
pages = convert_from_path(pdf_path, dpi)
|
||||
paths = []
|
||||
for i, img in enumerate(pages, 1):
|
||||
if deskew:
|
||||
img, applied = deskew_image(img)
|
||||
if applied != 0.0:
|
||||
# Trace d'audit : on note l'angle corrigé
|
||||
(out_dir / f"page_{i:02d}.skew").write_text(f"{applied:.3f}\n")
|
||||
p = out_dir / f"page_{i:02d}.png"
|
||||
img.save(p, "PNG", optimize=True)
|
||||
paths.append(p)
|
||||
|
||||
136
pipeline/json_utils.py
Normal file
136
pipeline/json_utils.py
Normal file
@@ -0,0 +1,136 @@
|
||||
"""Parsing JSON tolérant pour les sorties des VLM.
|
||||
|
||||
Les VLM (Qwen, GLM-OCR, GOT-OCR…) produisent du JSON avec des anomalies
|
||||
fréquentes :
|
||||
|
||||
- Encadrement par des fences markdown ```json ... ```
|
||||
- Virgules manquantes entre objets ou éléments de tableau
|
||||
- Boucles pathologiques d'objets vides `{"code":"","position":""}` répétés
|
||||
jusqu'à `max_new_tokens`, ce qui tronque le JSON sans le fermer proprement
|
||||
|
||||
Ce module expose `parse_json_output()` qui applique plusieurs stratégies de
|
||||
récupération avant d'abandonner. En dernier recours, il renvoie un dict avec
|
||||
`_raw` + `_parse_error` pour audit, jamais `None` (ce qui casserait le pipeline).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
|
||||
|
||||
# Pattern qui matche un objet vide générique {"code":"","position":"",...}
|
||||
_EMPTY_OBJ_PATTERN = re.compile(
|
||||
r'\{\s*"code"\s*:\s*""\s*,\s*"position"\s*:\s*""\s*(?:,\s*"libelle"\s*:\s*""\s*)?\}',
|
||||
re.DOTALL,
|
||||
)
|
||||
|
||||
_FENCE_OPEN_RE = re.compile(r"^```(?:json)?\s*")
|
||||
_FENCE_CLOSE_RE = re.compile(r"\s*```$")
|
||||
_MISSING_COMMA_OBJ_RE = re.compile(r"\}\s*\n(\s*\{)")
|
||||
_MISSING_COMMA_ARR_RE = re.compile(r"\]\s*\n(\s*\[)")
|
||||
|
||||
|
||||
def strip_fences(text: str) -> str:
|
||||
"""Retire un éventuel encadrement ```json ... ```."""
|
||||
text = _FENCE_OPEN_RE.sub("", text.strip())
|
||||
text = _FENCE_CLOSE_RE.sub("", text)
|
||||
return text
|
||||
|
||||
|
||||
def patch_missing_commas(text: str) -> str:
|
||||
"""Ajoute les virgules manquantes entre `}\\n{` et `]\\n[`.
|
||||
|
||||
Les VLM omettent fréquemment ces virgules dans leurs sorties JSON.
|
||||
"""
|
||||
text = _MISSING_COMMA_OBJ_RE.sub(r"},\n\1", text)
|
||||
text = _MISSING_COMMA_ARR_RE.sub(r"],\n\1", text)
|
||||
return text
|
||||
|
||||
|
||||
def truncate_empty_loop(text: str, max_consecutive: int = 2) -> str:
|
||||
"""Tronque les boucles d'objets vides (`{"code":"","position":""}` répété).
|
||||
|
||||
Cas d'usage : quand un tableau DAS ou Actes est vide dans l'image, le
|
||||
VLM a parfois tendance à générer le même objet vide en boucle jusqu'à
|
||||
saturer `max_new_tokens`. La sortie est alors tronquée sans fermer le
|
||||
JSON → parse error. Garder au maximum `max_consecutive` occurrences.
|
||||
"""
|
||||
matches = list(_EMPTY_OBJ_PATTERN.finditer(text))
|
||||
if len(matches) <= max_consecutive:
|
||||
return text
|
||||
cut_at = matches[max_consecutive - 1].end()
|
||||
return text[:cut_at]
|
||||
|
||||
|
||||
def close_open_json(text: str) -> str:
|
||||
"""Ajoute les brackets/braces manquants pour fermer un JSON tronqué.
|
||||
|
||||
Compte les accolades et crochets non-balancés en ignorant ceux à
|
||||
l'intérieur d'une chaîne, puis ferme dans le bon ordre (tableaux
|
||||
ouverts d'abord, puis objets).
|
||||
"""
|
||||
depth_brace = 0
|
||||
depth_bracket = 0
|
||||
in_string = False
|
||||
escape = False
|
||||
for c in text:
|
||||
if escape:
|
||||
escape = False
|
||||
continue
|
||||
if c == "\\":
|
||||
escape = True
|
||||
continue
|
||||
if c == '"':
|
||||
in_string = not in_string
|
||||
continue
|
||||
if in_string:
|
||||
continue
|
||||
if c == "{":
|
||||
depth_brace += 1
|
||||
elif c == "}":
|
||||
depth_brace -= 1
|
||||
elif c == "[":
|
||||
depth_bracket += 1
|
||||
elif c == "]":
|
||||
depth_bracket -= 1
|
||||
closed = text.rstrip().rstrip(",")
|
||||
closed += "]" * max(0, depth_bracket)
|
||||
closed += "}" * max(0, depth_brace)
|
||||
return closed
|
||||
|
||||
|
||||
def parse_json_output(raw: str) -> dict | None:
|
||||
"""Parse une sortie VLM en dict. Applique plusieurs stratégies :
|
||||
|
||||
1. strip des fences markdown ```json
|
||||
2. parse direct
|
||||
3. patch des virgules manquantes
|
||||
4. troncature des boucles d'objets vides + fermeture du JSON
|
||||
|
||||
En cas d'échec de toutes les stratégies, retourne
|
||||
`{"_raw": raw, "_parse_error": str}` pour permettre l'audit manuel
|
||||
plutôt que de casser le pipeline.
|
||||
"""
|
||||
if not raw:
|
||||
return None
|
||||
text = strip_fences(raw)
|
||||
try:
|
||||
return json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
patched = patch_missing_commas(text)
|
||||
try:
|
||||
return json.loads(patched)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
trimmed = truncate_empty_loop(patched)
|
||||
closed = close_open_json(trimmed)
|
||||
try:
|
||||
result = json.loads(closed)
|
||||
if isinstance(result, dict):
|
||||
result["_truncated_loop"] = True
|
||||
return result
|
||||
except json.JSONDecodeError as e:
|
||||
return {"_raw": raw, "_parse_error": str(e)}
|
||||
@@ -27,22 +27,64 @@ class QwenVLOCR:
|
||||
|
||||
def _init_model(self):
|
||||
t0 = time.time()
|
||||
import os as _os
|
||||
|
||||
# max_pixels limite le nombre de patches visuels pour éviter l'OOM
|
||||
# sur images 300 dpi (2481x3509). ~800 patches = équilibre qualité/VRAM,
|
||||
# tient confortablement dans ~5-6 Go même avec d'autres processus GPU
|
||||
# en arrière-plan. Configurable via env var QWEN_MAX_PIXELS (en patches).
|
||||
import os as _os
|
||||
max_pixels = int(_os.environ.get("QWEN_MAX_PIXELS", 800)) * 28 * 28
|
||||
self.processor = AutoProcessor.from_pretrained(
|
||||
MODEL_PATH,
|
||||
min_pixels=256 * 28 * 28,
|
||||
max_pixels=max_pixels,
|
||||
)
|
||||
|
||||
# Device : "auto" par défaut (GPU si dispo), "cpu" pour forcer le CPU
|
||||
# quand la VRAM est saturée par d'autres process. Configurable via
|
||||
# QWEN_DEVICE=cpu.
|
||||
device = _os.environ.get("QWEN_DEVICE", "auto").lower()
|
||||
if device == "cpu":
|
||||
# Sur CPU on cherche à maximiser le throughput :
|
||||
# 1. Utiliser tous les cores via torch.set_num_threads (set_num_threads
|
||||
# prime sur OMP_NUM_THREADS pour les ops PyTorch natifs).
|
||||
# 2. Choisir bfloat16 si le CPU le supporte nativement (Zen 5,
|
||||
# Zen 4, Intel Sapphire Rapids+ ont AVX-512 BF16). Sinon float32.
|
||||
n_threads = int(_os.environ.get("TORCH_NUM_THREADS", _os.cpu_count() or 8))
|
||||
torch.set_num_threads(n_threads)
|
||||
try:
|
||||
torch.set_num_interop_threads(n_threads)
|
||||
except RuntimeError:
|
||||
pass # déjà initialisé, ignorer
|
||||
|
||||
# Détection AVX-512 BF16 via /proc/cpuinfo (Linux)
|
||||
use_bf16 = False
|
||||
try:
|
||||
with open("/proc/cpuinfo") as f:
|
||||
flags = f.read()
|
||||
use_bf16 = "avx512_bf16" in flags or "amx_bf16" in flags
|
||||
except Exception:
|
||||
pass
|
||||
dtype = torch.bfloat16 if use_bf16 else torch.float32
|
||||
|
||||
self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
|
||||
MODEL_PATH,
|
||||
torch_dtype=dtype,
|
||||
device_map={"": "cpu"},
|
||||
low_cpu_mem_usage=True,
|
||||
)
|
||||
self.device_used = "cpu"
|
||||
self.cpu_threads = n_threads
|
||||
self.cpu_dtype = str(dtype).replace("torch.", "")
|
||||
else:
|
||||
self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
|
||||
MODEL_PATH,
|
||||
torch_dtype=torch.bfloat16,
|
||||
device_map="auto",
|
||||
)
|
||||
self.device_used = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
self.cpu_threads = None
|
||||
self.cpu_dtype = None
|
||||
self.model.eval()
|
||||
self.load_time = time.time() - t0
|
||||
self.vram_gb = torch.cuda.memory_allocated() / 1e9 if torch.cuda.is_available() else 0.0
|
||||
|
||||
203
pipeline/recueil.py
Normal file
203
pipeline/recueil.py
Normal file
@@ -0,0 +1,203 @@
|
||||
"""Logique spécifique à la page `recueil` (fiche médicale de recueil OGC).
|
||||
|
||||
Regroupe tout ce qui concerne cette page — la plus riche et la plus difficile
|
||||
à extraire du dossier OGC — séparé de l'orchestration générale :
|
||||
|
||||
- Résolution des zones configurables (crop Recodage, checkboxes)
|
||||
- Second passage VLM sur le crop de la colonne Recodage
|
||||
- Fusion du résultat crop dans le JSON principal
|
||||
- Classification des codes CIM-10 en DP/DR/DAS par règle métier
|
||||
- Enrichissement post-extraction (checkboxes Accord/Désaccord, ghs_injustifie)
|
||||
|
||||
Les fonctions sont testables indépendamment de Qwen quand on leur fournit
|
||||
déjà les sorties OCR brutes.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from .checkboxes import (
|
||||
CheckboxZones,
|
||||
RECUEIL_ACCORD_DESACCORD,
|
||||
detect_accord_desaccord,
|
||||
parse_ghs_injustifie,
|
||||
)
|
||||
from .json_utils import parse_json_output
|
||||
from .ocr_qwen import QwenVLOCR
|
||||
from .prompts import RECUEIL_RECODAGE_ZONE, SCHEMA_RECUEIL_RECODAGE
|
||||
from .zones_config import get_zone, load_config
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Résolution des zones (config JSON + fallback sur les defaults)
|
||||
# ============================================================
|
||||
|
||||
def resolve_recueil_zones() -> tuple[tuple[float, float, float, float], CheckboxZones]:
|
||||
"""Charge les zones de la page recueil depuis la config utilisateur,
|
||||
avec fallback sur les constantes compilées si la config est absente.
|
||||
|
||||
Retourne (zone_crop_recodage, zones_accord_desaccord).
|
||||
"""
|
||||
cfg = load_config()
|
||||
reco = get_zone("recueil", "codage_reco", cfg) or RECUEIL_RECODAGE_ZONE
|
||||
acc = get_zone("recueil", "accord_checkbox", cfg)
|
||||
des = get_zone("recueil", "desaccord_checkbox", cfg)
|
||||
if acc and des:
|
||||
cb = CheckboxZones(accord=acc, desaccord=des)
|
||||
else:
|
||||
cb = RECUEIL_ACCORD_DESACCORD
|
||||
return reco, cb
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Classification CIM-10 → DP / DR / DAS (pur, testable sans VLM)
|
||||
# ============================================================
|
||||
|
||||
CIM10_RE = re.compile(r"^[A-Z]\d{2,4}\s*\*?\s*\+?\d*$")
|
||||
|
||||
|
||||
def filter_cim10_codes(codes_raw: list[Any]) -> list[dict]:
|
||||
"""Filtre une liste de codes OCR bruts pour ne garder que les CIM-10.
|
||||
|
||||
Les VLM peuvent parfois lire des codes CCAM (actes) dans un crop qui
|
||||
dépasse sur le bloc Actes. On les retire ici pour ne pas polluer les DAS.
|
||||
"""
|
||||
kept = []
|
||||
for c in codes_raw or []:
|
||||
if not isinstance(c, dict):
|
||||
continue
|
||||
code = (c.get("code") or "").strip()
|
||||
if code and CIM10_RE.match(code):
|
||||
kept.append({
|
||||
"code": code,
|
||||
"position": str(c.get("position") or "").strip(),
|
||||
})
|
||||
return kept
|
||||
|
||||
|
||||
def classify_codes_dp_dr_das(codes: list[dict]) -> tuple[str, str, list[dict]]:
|
||||
"""Classifie une liste de codes {code, position} en DP, DR et liste de DAS.
|
||||
|
||||
Règle métier :
|
||||
- 1er code sans position → DP
|
||||
- 2e code sans position → DR (ignoré si identique au DP : le VLM peut
|
||||
dupliquer le DP quand la case DR est visuellement vide)
|
||||
- tous les codes avec position → DAS
|
||||
- codes sans position au-delà du 2e → DAS sans position (pour ne rien perdre)
|
||||
"""
|
||||
dp, dr = "", ""
|
||||
das: list[dict] = []
|
||||
dp_assigned = dr_assigned = False
|
||||
for c in codes:
|
||||
code, position = c["code"], c["position"]
|
||||
if not position:
|
||||
if not dp_assigned:
|
||||
dp, dp_assigned = code, True
|
||||
elif not dr_assigned:
|
||||
if code == dp:
|
||||
dr_assigned = True # doublon DP → DR vide
|
||||
else:
|
||||
dr, dr_assigned = code, True
|
||||
else:
|
||||
das.append({"code": code, "position": ""})
|
||||
else:
|
||||
das.append(c)
|
||||
return dp, dr, das
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Second passage VLM sur crop Recodage
|
||||
# ============================================================
|
||||
|
||||
def run_recodage_crop_pass(image_path: Path, ocr: QwenVLOCR,
|
||||
zone: tuple[float, float, float, float] | None = None
|
||||
) -> dict | None:
|
||||
"""Execute un second passage VLM sur le crop zonal de la colonne Recodage.
|
||||
|
||||
Sauvegarde le crop à côté de l'image source (suffixe `_recodage.png`)
|
||||
pour audit. Retourne un dict avec `dp/dr/das` + métadonnées, ou None
|
||||
en cas d'échec d'OCR ou de parsing.
|
||||
"""
|
||||
try:
|
||||
img = Image.open(image_path)
|
||||
w, h = img.size
|
||||
z = zone
|
||||
if z is None:
|
||||
z, _ = resolve_recueil_zones()
|
||||
x1, y1, x2, y2 = z
|
||||
crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
|
||||
crop_path = image_path.parent / f"{image_path.stem}_recodage.png"
|
||||
crop.save(crop_path)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
t0 = time.time()
|
||||
res = ocr.run(crop_path, SCHEMA_RECUEIL_RECODAGE, max_new_tokens=1024)
|
||||
parsed = parse_json_output(res["text"])
|
||||
if not isinstance(parsed, dict) or "_parse_error" in parsed:
|
||||
return None
|
||||
|
||||
codes = filter_cim10_codes(parsed.get("codes") or [])
|
||||
dp, dr, das = classify_codes_dp_dr_das(codes)
|
||||
return {
|
||||
"dp": dp, "dr": dr, "das": das,
|
||||
"_source": "crop_recodage",
|
||||
"_elapsed_s": round(res["elapsed_s"], 2),
|
||||
"_n_codes_raw": len(parsed.get("codes") or []),
|
||||
"_n_codes_kept": len(codes),
|
||||
}
|
||||
|
||||
|
||||
def merge_codage_reco(parsed: dict, reco: dict) -> None:
|
||||
"""Fusionne le résultat du crop Recodage dans `parsed["codage_reco"]`.
|
||||
|
||||
Politique de merge : le crop est plus fiable (contexte isolé) donc il
|
||||
prime sur le passage principal. Exception : si un champ du crop est vide
|
||||
mais que le passage principal l'a rempli, on garde celui du passage
|
||||
principal (on ne dégrade jamais un résultat existant).
|
||||
"""
|
||||
existing = parsed.get("codage_reco") if isinstance(parsed.get("codage_reco"), dict) else {}
|
||||
parsed["codage_reco"] = {
|
||||
"dp": reco.get("dp", "") or existing.get("dp", ""),
|
||||
"dr": reco.get("dr", "") or existing.get("dr", ""),
|
||||
"das": reco.get("das") or existing.get("das") or [],
|
||||
}
|
||||
parsed.setdefault("_crop_recodage", {})["result"] = reco
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Enrichissement post-extraction d'une page recueil
|
||||
# ============================================================
|
||||
|
||||
def enrich_recueil(parsed: dict, image_path: Path, ocr: QwenVLOCR,
|
||||
cb_zones: CheckboxZones | None = None) -> dict:
|
||||
"""Enrichit un JSON recueil parsé avec :
|
||||
- checkbox accord/désaccord (méthode densité pixels, indépendante du VLM)
|
||||
- normalisation `ghs_injustifie` → 0 / 1 / ""
|
||||
- second passage VLM sur le crop Recodage si besoin, fusionné dans `codage_reco`
|
||||
|
||||
Modifie `parsed` en place et le renvoie (pratique pour chaînage).
|
||||
"""
|
||||
if not isinstance(parsed, dict):
|
||||
return parsed
|
||||
zones = cb_zones or resolve_recueil_zones()[1]
|
||||
|
||||
# Checkboxes accord / désaccord
|
||||
cb = detect_accord_desaccord(image_path, zones)
|
||||
parsed["accord_desaccord"] = cb["decision"]
|
||||
parsed["_checkbox_debug"] = cb
|
||||
|
||||
# Normalisation ghs_injustifie
|
||||
parsed["ghs_injustifie"] = parse_ghs_injustifie(parsed.get("ghs_injustifie", ""))
|
||||
|
||||
# Second passage Recodage
|
||||
reco = run_recodage_crop_pass(image_path, ocr)
|
||||
if reco:
|
||||
merge_codage_reco(parsed, reco)
|
||||
|
||||
return parsed
|
||||
@@ -27,7 +27,44 @@ if str(_REPO_ROOT) not in sys.path:
|
||||
import streamlit as st
|
||||
from PIL import Image
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Compatibility shim : streamlit-drawable-canvas 0.9.3 utilise l'API privée
|
||||
# `streamlit.elements.image.image_to_url` qui a été retirée à partir de
|
||||
# Streamlit ≈ 1.49. On réinjecte une implémentation équivalente fondée sur
|
||||
# un data URI base64, ce qui permet au canvas de continuer à fonctionner
|
||||
# sans downgrader Streamlit globalement.
|
||||
#
|
||||
# Remplacer ce shim par l'upgrade de streamlit-drawable-canvas si une version
|
||||
# > 0.9.3 est publiée.
|
||||
# ----------------------------------------------------------------------------
|
||||
import base64 as _b64
|
||||
import io as _io
|
||||
from streamlit.elements import image as _st_image # type: ignore
|
||||
|
||||
if not hasattr(_st_image, "image_to_url"):
|
||||
def _image_to_url_compat(image, width, clamp, channels, output_format,
|
||||
image_id):
|
||||
"""Convertit une PIL.Image en data URI compatible avec drawable-canvas."""
|
||||
fmt = (output_format or "PNG").upper()
|
||||
if fmt == "JPG":
|
||||
fmt = "JPEG"
|
||||
buf = _io.BytesIO()
|
||||
image.save(buf, format=fmt)
|
||||
b64 = _b64.b64encode(buf.getvalue()).decode("ascii")
|
||||
mime = "image/jpeg" if fmt == "JPEG" else f"image/{fmt.lower()}"
|
||||
return f"data:{mime};base64,{b64}"
|
||||
|
||||
_st_image.image_to_url = _image_to_url_compat # type: ignore[attr-defined]
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
from pipeline.ingest import pdf_to_images
|
||||
from pipeline.zones_config import load_config, save_config, DEFAULT_CONFIG_PATH
|
||||
|
||||
try:
|
||||
from streamlit_drawable_canvas import st_canvas
|
||||
_HAS_CANVAS = True
|
||||
except ImportError:
|
||||
_HAS_CANVAS = False
|
||||
|
||||
|
||||
# ============================================================
|
||||
@@ -268,6 +305,130 @@ def render_page_editor(name: str, ptype: str, extract: dict, gold: dict | None):
|
||||
st.code(page_meta.get("ocr_raw", ""), language="json")
|
||||
|
||||
|
||||
def render_calibration_page():
|
||||
"""Mode 'Calibration zones' : dessine des rectangles à la souris sur une
|
||||
image de référence, sauvegarde dans pipeline/zones_config.json."""
|
||||
st.header("🔧 Calibration des zones")
|
||||
|
||||
if not _HAS_CANVAS:
|
||||
st.error(
|
||||
"Le package `streamlit-drawable-canvas` n'est pas installé.\n"
|
||||
"Installe-le avec : `pip install streamlit-drawable-canvas`"
|
||||
)
|
||||
return
|
||||
|
||||
pdfs = list_pdfs()
|
||||
if not pdfs:
|
||||
st.error("Aucun PDF disponible pour la calibration")
|
||||
return
|
||||
|
||||
col_ctrl, _ = st.columns([1, 3])
|
||||
with col_ctrl:
|
||||
ref_name = st.selectbox(
|
||||
"PDF de référence (bien cadré)",
|
||||
[p.stem for p in pdfs], key="calib_pdf",
|
||||
)
|
||||
page_type = st.selectbox(
|
||||
"Type de page", ["recueil"],
|
||||
help="Aujourd'hui seule la page recueil a des zones configurables",
|
||||
)
|
||||
# Page numéro selon le type (recueil = page 1)
|
||||
page_num = {"recueil": 1}.get(page_type, 1)
|
||||
|
||||
ref_pdf = next(p for p in pdfs if p.stem == ref_name)
|
||||
img_path = pdf_to_images(str(ref_pdf))[page_num - 1]
|
||||
img = Image.open(img_path)
|
||||
img_w, img_h = img.size
|
||||
|
||||
# Charger config existante et préparer les zones
|
||||
cfg = load_config()
|
||||
existing_zones = cfg.get(page_type, {})
|
||||
|
||||
# On scale l'image pour tenir dans le canvas (largeur ~900 px max)
|
||||
canvas_w = 900
|
||||
scale = canvas_w / img_w
|
||||
canvas_h = int(img_h * scale)
|
||||
|
||||
# Préparer les rectangles initiaux depuis la config
|
||||
initial_rects = []
|
||||
for zone_name, z in existing_zones.items():
|
||||
if not isinstance(z, dict): continue
|
||||
initial_rects.append({
|
||||
"type": "rect",
|
||||
"left": z["x1"] * canvas_w,
|
||||
"top": z["y1"] * canvas_h,
|
||||
"width": (z["x2"] - z["x1"]) * canvas_w,
|
||||
"height": (z["y2"] - z["y1"]) * canvas_h,
|
||||
"fill": "rgba(255, 100, 100, 0.15)",
|
||||
"stroke": "red",
|
||||
"strokeWidth": 2,
|
||||
"label_name": zone_name,
|
||||
})
|
||||
|
||||
st.caption(
|
||||
"💡 Dessine un rectangle par zone à la souris. Les zones existantes "
|
||||
"apparaissent déjà pré-dessinées. Tu peux les modifier (drag), "
|
||||
"en ajouter, ou en supprimer (touche Suppr) puis cliquer sur "
|
||||
"**Sauvegarder**."
|
||||
)
|
||||
|
||||
drawing_mode = st.radio(
|
||||
"Mode", ["rect", "transform"], horizontal=True,
|
||||
format_func=lambda x: {"rect": "✏️ Dessiner", "transform": "🖱 Sélectionner / Déplacer"}[x],
|
||||
key="calib_drawing_mode",
|
||||
)
|
||||
|
||||
canvas_result = st_canvas(
|
||||
fill_color="rgba(255, 100, 100, 0.15)",
|
||||
stroke_width=2,
|
||||
stroke_color="red",
|
||||
background_image=img,
|
||||
update_streamlit=True,
|
||||
width=canvas_w,
|
||||
height=canvas_h,
|
||||
drawing_mode=drawing_mode,
|
||||
initial_drawing={"objects": initial_rects, "version": "5.2.1"},
|
||||
key="calib_canvas",
|
||||
)
|
||||
|
||||
# Reconstituer la config à partir des rectangles dessinés
|
||||
rects = (canvas_result.json_data or {}).get("objects", []) if canvas_result.json_data else []
|
||||
|
||||
st.markdown("### Zones détectées")
|
||||
if not rects:
|
||||
st.info("Aucun rectangle dessiné.")
|
||||
return
|
||||
|
||||
new_zones = {}
|
||||
for i, r in enumerate(rects):
|
||||
if r.get("type") != "rect":
|
||||
continue
|
||||
# Récupérer le nom existant si présent, sinon demander
|
||||
default_name = r.get("label_name") or f"zone_{i+1}"
|
||||
name = st.text_input(
|
||||
f"Nom de la zone {i+1}",
|
||||
value=default_name, key=f"calib_name_{i}",
|
||||
)
|
||||
x1 = r["left"] / canvas_w
|
||||
y1 = r["top"] / canvas_h
|
||||
x2 = x1 + r["width"] / canvas_w
|
||||
y2 = y1 + r["height"] / canvas_h
|
||||
desc = existing_zones.get(name, {}).get("description", "")
|
||||
desc = st.text_input(
|
||||
f"Description (optionnel)", value=desc, key=f"calib_desc_{i}",
|
||||
)
|
||||
st.caption(f"Coords relatives : ({x1:.3f}, {y1:.3f}) → ({x2:.3f}, {y2:.3f})")
|
||||
new_zones[name] = {"x1": round(x1, 4), "y1": round(y1, 4),
|
||||
"x2": round(x2, 4), "y2": round(y2, 4),
|
||||
"description": desc}
|
||||
|
||||
if st.button("💾 Sauvegarder la configuration", type="primary"):
|
||||
cfg[page_type] = new_zones
|
||||
path = save_config(cfg)
|
||||
st.success(f"Configuration sauvegardée : {path}")
|
||||
st.json(new_zones)
|
||||
|
||||
|
||||
def main():
|
||||
st.set_page_config(page_title="OGC Overlay", layout="wide")
|
||||
|
||||
@@ -280,6 +441,13 @@ def main():
|
||||
|
||||
st.title("🩺 Extraction OGC — review & gold set")
|
||||
|
||||
# Sélecteur de mode en haut de sidebar
|
||||
with st.sidebar:
|
||||
mode = st.radio("Mode", ["📋 Review dossier", "🔧 Calibration zones"])
|
||||
if mode == "🔧 Calibration zones":
|
||||
render_calibration_page()
|
||||
return
|
||||
|
||||
pdfs = list_pdfs()
|
||||
if not pdfs:
|
||||
st.error(f"Aucun PDF trouvé dans {PDF_DIR}")
|
||||
|
||||
90
pipeline/zones_config.py
Normal file
90
pipeline/zones_config.py
Normal file
@@ -0,0 +1,90 @@
|
||||
"""Configuration des zones d'extraction éditable via l'overlay UI.
|
||||
|
||||
Les coordonnées sont relatives (0..1) dans l'image source. Elles sont chargées
|
||||
au démarrage du pipeline et utilisées à la place des constantes en dur dans
|
||||
`pipeline/prompts.py` et `pipeline/checkboxes.py` — avec fallback sur ces
|
||||
constantes si la config n'est pas présente, pour ne pas casser l'existant.
|
||||
|
||||
Structure :
|
||||
{
|
||||
"recueil": {
|
||||
"codage_reco": {"x1":0.77, "y1":0.330, "x2":0.97, "y2":0.490, "description":"..."},
|
||||
"accord_checkbox": {"x1":..., "y1":..., "x2":..., "y2":..., "description":"..."},
|
||||
"desaccord_checkbox":{...}
|
||||
},
|
||||
"concertation_2": {...}
|
||||
}
|
||||
|
||||
Un fichier unique `zones_config.json` à la racine du projet, ou au chemin pointé
|
||||
par la variable d'env `OGC_ZONES_CONFIG`.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
DEFAULT_CONFIG_PATH = Path(
|
||||
os.environ.get("OGC_ZONES_CONFIG", "zones_config.json")
|
||||
)
|
||||
|
||||
|
||||
# Zones par défaut, identiques aux constantes actuelles dans prompts.py et
|
||||
# checkboxes.py. Sert de fallback et de "mise à jour initiale" quand le
|
||||
# fichier n'existe pas encore.
|
||||
DEFAULTS: dict = {
|
||||
"recueil": {
|
||||
"codage_reco": {
|
||||
"x1": 0.77, "y1": 0.330, "x2": 0.97, "y2": 0.490,
|
||||
"description": "Colonne Recodage (DP / DR / DAS) — exclut le bloc Actes",
|
||||
},
|
||||
"accord_checkbox": {
|
||||
"x1": 0.588, "y1": 0.838, "x2": 0.622, "y2": 0.860,
|
||||
"description": "Case à cocher 'Accord'",
|
||||
},
|
||||
"desaccord_checkbox": {
|
||||
"x1": 0.588, "y1": 0.858, "x2": 0.622, "y2": 0.880,
|
||||
"description": "Case à cocher 'Désaccord'",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def load_config(path: Path = DEFAULT_CONFIG_PATH) -> dict:
|
||||
"""Charge la config JSON, ou retourne les defaults si absente."""
|
||||
if not path.exists():
|
||||
return _deep_copy(DEFAULTS)
|
||||
try:
|
||||
raw = json.loads(path.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
return _deep_copy(DEFAULTS)
|
||||
# Merge : les defaults sont une base, la config utilisateur vient par-dessus
|
||||
merged = _deep_copy(DEFAULTS)
|
||||
for page, zones in raw.items():
|
||||
merged.setdefault(page, {}).update(zones)
|
||||
return merged
|
||||
|
||||
|
||||
def save_config(cfg: dict, path: Path = DEFAULT_CONFIG_PATH) -> Path:
|
||||
path.write_text(json.dumps(cfg, ensure_ascii=False, indent=2), encoding="utf-8")
|
||||
return path
|
||||
|
||||
|
||||
def get_zone(page_type: str, zone_name: str,
|
||||
config: dict | None = None) -> tuple[float, float, float, float] | None:
|
||||
"""Récupère une zone depuis la config ou les defaults.
|
||||
|
||||
Retourne (x1, y1, x2, y2) ou None si inconnue.
|
||||
"""
|
||||
cfg = config or load_config()
|
||||
z = cfg.get(page_type, {}).get(zone_name)
|
||||
if not isinstance(z, dict):
|
||||
return None
|
||||
try:
|
||||
return (float(z["x1"]), float(z["y1"]), float(z["x2"]), float(z["y2"]))
|
||||
except (KeyError, ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
def _deep_copy(d: dict) -> dict:
|
||||
return json.loads(json.dumps(d))
|
||||
160
tests/test_checkboxes.py
Normal file
160
tests/test_checkboxes.py
Normal file
@@ -0,0 +1,160 @@
|
||||
"""Tests unitaires pour pipeline.checkboxes."""
|
||||
from __future__ import annotations
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from PIL import Image
|
||||
|
||||
from pipeline.checkboxes import (
|
||||
AMBIGU_MARGIN,
|
||||
CheckboxZones,
|
||||
RECUEIL_ACCORD_DESACCORD,
|
||||
dark_ratio,
|
||||
detect_accord_desaccord,
|
||||
parse_ghs_injustifie,
|
||||
)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# parse_ghs_injustifie
|
||||
# ============================================================
|
||||
|
||||
class TestParseGhsInjustifie:
|
||||
@pytest.mark.parametrize("raw,expected", [
|
||||
("0", "0"),
|
||||
("1", "1"),
|
||||
("0 SE 1 2 3 4 ATU FFM FSD", "0"),
|
||||
("1 SE 2 ATU", "1"),
|
||||
(" 0 ", "0"),
|
||||
("", ""),
|
||||
(None, ""),
|
||||
("SE 1 2 3 4 ATU FFM FSD", ""), # pas de chiffre de tête
|
||||
("abc", ""),
|
||||
("2 SE 1", ""), # 2 n'est ni 0 ni 1
|
||||
])
|
||||
def test_cas_varies(self, raw, expected):
|
||||
assert parse_ghs_injustifie(raw) == expected
|
||||
|
||||
|
||||
# ============================================================
|
||||
# dark_ratio (avec images synthétiques)
|
||||
# ============================================================
|
||||
|
||||
def _solid_image(w: int, h: int, gray_value: int = 255) -> Image.Image:
|
||||
arr = np.full((h, w), gray_value, dtype=np.uint8)
|
||||
return Image.fromarray(arr, mode="L").convert("RGB")
|
||||
|
||||
|
||||
def _image_with_dark_square(w: int, h: int,
|
||||
square_bbox: tuple[float, float, float, float]) -> Image.Image:
|
||||
"""Image blanche avec un carré noir dans la zone bbox (coords relatives)."""
|
||||
arr = np.full((h, w), 255, dtype=np.uint8)
|
||||
x1, y1, x2, y2 = square_bbox
|
||||
arr[int(y1*h):int(y2*h), int(x1*w):int(x2*w)] = 0
|
||||
return Image.fromarray(arr, mode="L").convert("RGB")
|
||||
|
||||
|
||||
class TestDarkRatio:
|
||||
def test_image_blanche(self):
|
||||
img = _solid_image(100, 100, 255)
|
||||
ratio = dark_ratio(img, (0.2, 0.2, 0.8, 0.8))
|
||||
assert ratio == 0.0
|
||||
|
||||
def test_image_noire(self):
|
||||
img = _solid_image(100, 100, 0)
|
||||
ratio = dark_ratio(img, (0.2, 0.2, 0.8, 0.8))
|
||||
assert ratio == 1.0
|
||||
|
||||
def test_inner_frac_ignore_les_bords(self):
|
||||
"""Un carré noir occupe toute la zone mais avec un grand inner_frac
|
||||
on ne voit que le centre, qui reste dans la zone noire."""
|
||||
img = _image_with_dark_square(100, 100, (0.0, 0.0, 1.0, 1.0))
|
||||
# Tout noir, peu importe inner_frac
|
||||
assert dark_ratio(img, (0.0, 0.0, 1.0, 1.0), inner_frac=0.35) == 1.0
|
||||
|
||||
def test_cadre_seul_vs_contenu_central(self):
|
||||
"""Une case 'vide' (cadre seul) doit avoir un ratio inner_frac faible ;
|
||||
une case 'cochée' (croix au centre) doit avoir un ratio plus élevé."""
|
||||
# Simuler un cadre : carré noir sur le pourtour uniquement
|
||||
w, h = 100, 100
|
||||
arr = np.full((h, w), 255, dtype=np.uint8)
|
||||
arr[:5, :] = 0; arr[-5:, :] = 0; arr[:, :5] = 0; arr[:, -5:] = 0
|
||||
frame_only = Image.fromarray(arr, mode="L").convert("RGB")
|
||||
# Cadre + croix au centre
|
||||
arr2 = arr.copy()
|
||||
# Une croix : 2 diagonales
|
||||
for i in range(20, 80):
|
||||
arr2[i, i] = 0
|
||||
arr2[i, 100 - 1 - i] = 0
|
||||
checked = Image.fromarray(arr2, mode="L").convert("RGB")
|
||||
|
||||
ratio_empty = dark_ratio(frame_only, (0.0, 0.0, 1.0, 1.0), inner_frac=0.35)
|
||||
ratio_full = dark_ratio(checked, (0.0, 0.0, 1.0, 1.0), inner_frac=0.35)
|
||||
|
||||
# La case cochée doit avoir un ratio clairement plus élevé
|
||||
assert ratio_full > ratio_empty + 0.05
|
||||
|
||||
|
||||
# ============================================================
|
||||
# detect_accord_desaccord (fixtures cache)
|
||||
# ============================================================
|
||||
|
||||
class TestDetectAccordDesaccord:
|
||||
"""Tests sur les images réelles du cache, avec ground truth vérifié
|
||||
visuellement (cf. historique du projet, crops audités un par un).
|
||||
|
||||
Ground truth indexé par numéro d'OGC — le mapping vers le hash du cache
|
||||
est résolu au runtime via pipeline.ingest.pdf_hash pour éviter de coder
|
||||
les hashes en dur (fragile).
|
||||
"""
|
||||
|
||||
# Ground truth vérifié visuellement sur les 18 dossiers 2018 CARC
|
||||
GROUND_TRUTH_BY_OGC = {
|
||||
1: "accord",
|
||||
7: "accord",
|
||||
9: "désaccord",
|
||||
18: "désaccord",
|
||||
20: "désaccord",
|
||||
27: "désaccord",
|
||||
29: "accord",
|
||||
55: "accord",
|
||||
66: "désaccord",
|
||||
68: "accord",
|
||||
69: "accord",
|
||||
74: "désaccord",
|
||||
76: "désaccord",
|
||||
84: "accord",
|
||||
86: "désaccord",
|
||||
97: "accord",
|
||||
99: "désaccord",
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def cached_pages_with_truth(self):
|
||||
"""Résout le mapping numéro OGC → page_01.png disponible au runtime."""
|
||||
from pathlib import Path
|
||||
from pipeline.ingest import pdf_hash
|
||||
pdf_dir = Path("2018 CARC")
|
||||
if not pdf_dir.is_dir():
|
||||
pytest.skip("répertoire 2018 CARC/ absent")
|
||||
found = {}
|
||||
for n, expected in self.GROUND_TRUTH_BY_OGC.items():
|
||||
pdf = pdf_dir / f"OGC {n}.pdf"
|
||||
if not pdf.exists():
|
||||
continue
|
||||
h = pdf_hash(str(pdf))
|
||||
img = Path(f".cache/images/{h}/page_01.png")
|
||||
if img.exists():
|
||||
found[f"OGC {n}"] = (str(img), expected)
|
||||
if not found:
|
||||
pytest.skip("pas de cache d'images disponible — lance le pipeline d'abord")
|
||||
return found
|
||||
|
||||
def test_ground_truth_echantillon(self, cached_pages_with_truth):
|
||||
"""Sur les cas vérifiés visuellement, le détecteur doit matcher."""
|
||||
errors = []
|
||||
for name, (path, expected) in cached_pages_with_truth.items():
|
||||
r = detect_accord_desaccord(path)
|
||||
if r["decision"] != expected:
|
||||
errors.append(f"{name}: attendu={expected}, got={r}")
|
||||
assert not errors, "\n".join(errors)
|
||||
140
tests/test_deskew.py
Normal file
140
tests/test_deskew.py
Normal file
@@ -0,0 +1,140 @@
|
||||
"""Tests unitaires pour pipeline.deskew.
|
||||
|
||||
Tests sans dépendance GPU. Génère des images synthétiques en code + utilise
|
||||
les images du cache pour les cas réels.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from PIL import Image
|
||||
|
||||
from pipeline.deskew import (
|
||||
MAX_ANGLE_DEG,
|
||||
MIN_ANGLE_DEG,
|
||||
NEAR_HORIZONTAL_BAND,
|
||||
deskew_image,
|
||||
detect_skew_angle,
|
||||
)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Helpers : fabriquer une image synthétique avec des lignes
|
||||
# ============================================================
|
||||
|
||||
def _make_grid_image(w: int = 800, h: int = 1000,
|
||||
n_lines: int = 30, angle_deg: float = 0.0) -> Image.Image:
|
||||
"""Crée une image blanche avec `n_lines` lignes horizontales équi-réparties,
|
||||
optionnellement tournée d'un angle donné. Parfaite pour tester le détecteur.
|
||||
"""
|
||||
arr = np.ones((h, w), dtype=np.uint8) * 255
|
||||
for i in range(1, n_lines + 1):
|
||||
y = int(i * h / (n_lines + 1))
|
||||
arr[y - 1:y + 1, 50:w - 50] = 0 # ligne horizontale noire de 2 px
|
||||
img = Image.fromarray(arr, mode="L")
|
||||
if angle_deg != 0.0:
|
||||
# PIL.rotate : angle positif = sens trigonométrique (= anti-horaire)
|
||||
# On veut tester avec notre convention (positif = horaire) donc
|
||||
# on inverse ici pour cohérence avec detect_skew_angle
|
||||
img = img.rotate(-angle_deg, resample=Image.Resampling.BICUBIC,
|
||||
expand=False, fillcolor="white")
|
||||
return img.convert("RGB")
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Tests de détection
|
||||
# ============================================================
|
||||
|
||||
class TestDetectSkewAngle:
|
||||
def test_image_parfaitement_droite(self):
|
||||
img = _make_grid_image()
|
||||
angle = detect_skew_angle(img)
|
||||
assert abs(angle) < 0.1, f"image droite doit donner ~0°, got {angle}"
|
||||
|
||||
@pytest.mark.parametrize("input_angle", [1.0, 2.0, -3.0, 4.0])
|
||||
def test_detecte_angles_modérés(self, input_angle):
|
||||
"""Sur notre image synthétique (30 lignes), la sensibilité est ~1°.
|
||||
Sur de vraies fiches OGC avec 300+ lignes de tableaux, la sensibilité
|
||||
descend à 0.3° (cf. test réel sur OGC 1 : +0.91° détecté).
|
||||
"""
|
||||
img = _make_grid_image(angle_deg=input_angle)
|
||||
detected = detect_skew_angle(img)
|
||||
assert abs(detected - input_angle) < 0.5, \
|
||||
f"attendu ~{input_angle}°, détecté {detected}°"
|
||||
|
||||
def test_image_sans_lignes_retourne_zero(self):
|
||||
# Image totalement uniforme → aucune ligne détectable
|
||||
arr = np.ones((500, 500), dtype=np.uint8) * 255
|
||||
img = Image.fromarray(arr, mode="L").convert("RGB")
|
||||
assert detect_skew_angle(img) == 0.0
|
||||
|
||||
def test_angle_extrême_rejeté(self):
|
||||
# Une rotation de 45° dépasse MAX_ANGLE_DEG → on refuse de corriger
|
||||
img = _make_grid_image(angle_deg=45.0)
|
||||
detected = detect_skew_angle(img)
|
||||
# Soit 0.0 (pas de lignes quasi-horizontales à ±15°), soit borné
|
||||
assert abs(detected) < MAX_ANGLE_DEG or detected == 0.0
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Tests de correction (deskew_image)
|
||||
# ============================================================
|
||||
|
||||
class TestDeskewImage:
|
||||
def test_image_droite_inchangée(self):
|
||||
img = _make_grid_image()
|
||||
rotated, applied = deskew_image(img)
|
||||
assert applied == 0.0
|
||||
# Identité bit à bit
|
||||
assert np.array_equal(np.array(rotated), np.array(img))
|
||||
|
||||
def test_image_inclinée_corrigée(self):
|
||||
img = _make_grid_image(angle_deg=2.0)
|
||||
rotated, applied = deskew_image(img)
|
||||
# On attend qu'on applique un angle proche de 2° (convention positive)
|
||||
assert abs(applied) > MIN_ANGLE_DEG, \
|
||||
f"devrait corriger, got applied={applied}"
|
||||
# Après rotation, l'angle résiduel doit être très faible
|
||||
residual = detect_skew_angle(rotated)
|
||||
assert abs(residual) < 0.5, \
|
||||
f"angle résiduel trop grand après correction : {residual}°"
|
||||
|
||||
def test_seuil_min_angle_respecté(self):
|
||||
# Un skew juste sous le seuil ne doit pas être corrigé
|
||||
img = _make_grid_image(angle_deg=MIN_ANGLE_DEG / 2)
|
||||
_, applied = deskew_image(img)
|
||||
assert applied == 0.0
|
||||
|
||||
def test_angle_forcé(self):
|
||||
"""On peut forcer un angle arbitraire indépendamment de la détection."""
|
||||
img = _make_grid_image() # droit
|
||||
rotated, applied = deskew_image(img, angle=5.0)
|
||||
assert applied == 5.0
|
||||
# Taille conservée
|
||||
assert rotated.size == img.size
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Tests avec fixtures réelles (si cache dispo)
|
||||
# ============================================================
|
||||
|
||||
class TestOnRealCachedPages:
|
||||
"""Ces tests s'exécutent seulement si le cache d'images existe."""
|
||||
|
||||
@pytest.fixture
|
||||
def cached_pages(self):
|
||||
paths = sorted(Path(".cache/images").glob("*/page_01.png"))
|
||||
if not paths:
|
||||
pytest.skip("pas de cache d'images disponible")
|
||||
return paths
|
||||
|
||||
def test_detection_ne_crash_pas(self, cached_pages):
|
||||
"""Sur toutes les pages cachées, detect_skew_angle ne doit pas planter."""
|
||||
for p in cached_pages[:5]: # limite pour la vitesse
|
||||
img = Image.open(p)
|
||||
angle = detect_skew_angle(img)
|
||||
assert isinstance(angle, float)
|
||||
assert abs(angle) <= MAX_ANGLE_DEG
|
||||
119
tests/test_json_utils.py
Normal file
119
tests/test_json_utils.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""Tests unitaires pour pipeline.json_utils."""
|
||||
from __future__ import annotations
|
||||
|
||||
from pipeline.json_utils import (
|
||||
close_open_json,
|
||||
parse_json_output,
|
||||
patch_missing_commas,
|
||||
strip_fences,
|
||||
truncate_empty_loop,
|
||||
)
|
||||
|
||||
|
||||
class TestStripFences:
|
||||
def test_fence_json(self):
|
||||
raw = '```json\n{"a": 1}\n```'
|
||||
assert strip_fences(raw).strip() == '{"a": 1}'
|
||||
|
||||
def test_fence_simple(self):
|
||||
raw = '```\n{"a": 1}\n```'
|
||||
assert strip_fences(raw).strip() == '{"a": 1}'
|
||||
|
||||
def test_pas_de_fence(self):
|
||||
raw = '{"a": 1}'
|
||||
assert strip_fences(raw).strip() == '{"a": 1}'
|
||||
|
||||
|
||||
class TestPatchMissingCommas:
|
||||
def test_objets_consecutifs(self):
|
||||
raw = '[\n{"a": 1}\n{"b": 2}\n]'
|
||||
patched = patch_missing_commas(raw)
|
||||
assert '},' in patched
|
||||
|
||||
def test_deja_correct(self):
|
||||
raw = '{"a": 1}'
|
||||
assert patch_missing_commas(raw) == raw
|
||||
|
||||
|
||||
class TestTruncateEmptyLoop:
|
||||
def test_moins_que_seuil(self):
|
||||
raw = '[{"code":"","position":""},{"code":"","position":""}]'
|
||||
# 2 objets vides = seuil par défaut, rien à tronquer
|
||||
out = truncate_empty_loop(raw, max_consecutive=2)
|
||||
assert out == raw
|
||||
|
||||
def test_boucle_tronquée(self):
|
||||
objs = ['{"code":"","position":""}'] * 10
|
||||
raw = '[' + ','.join(objs)
|
||||
out = truncate_empty_loop(raw, max_consecutive=2)
|
||||
# Après troncature, ne doit contenir que 2 occurrences
|
||||
assert out.count('{"code":""') == 2
|
||||
|
||||
def test_pas_de_boucle(self):
|
||||
raw = '[{"code":"K650","position":"1"}]'
|
||||
assert truncate_empty_loop(raw) == raw
|
||||
|
||||
|
||||
class TestCloseOpenJson:
|
||||
def test_deja_ferme(self):
|
||||
raw = '{"a": [1, 2]}'
|
||||
assert close_open_json(raw) == raw
|
||||
|
||||
def test_accolade_manquante(self):
|
||||
raw = '{"a": 1'
|
||||
closed = close_open_json(raw)
|
||||
assert closed == '{"a": 1}'
|
||||
|
||||
def test_crochet_manquant(self):
|
||||
raw = '{"a": [1, 2'
|
||||
closed = close_open_json(raw)
|
||||
assert closed == '{"a": [1, 2]}'
|
||||
|
||||
def test_accolades_et_crochets_imbriqués(self):
|
||||
raw = '{"a": {"b": [1, 2'
|
||||
closed = close_open_json(raw)
|
||||
assert closed == '{"a": {"b": [1, 2]}}'
|
||||
|
||||
def test_virgule_trainante_supprimée(self):
|
||||
raw = '{"a": 1, '
|
||||
closed = close_open_json(raw)
|
||||
assert closed == '{"a": 1}'
|
||||
|
||||
def test_accolade_dans_string_ignorée(self):
|
||||
raw = '{"a": "{ ceci est une { accolade dans une string"'
|
||||
closed = close_open_json(raw)
|
||||
# On ajoute juste l'accolade finale manquante
|
||||
assert closed == raw + '}'
|
||||
|
||||
|
||||
class TestParseJsonOutput:
|
||||
def test_json_valide(self):
|
||||
assert parse_json_output('{"a": 1}') == {"a": 1}
|
||||
|
||||
def test_vide(self):
|
||||
assert parse_json_output("") is None
|
||||
assert parse_json_output(None) is None
|
||||
|
||||
def test_fences_markdown(self):
|
||||
assert parse_json_output('```json\n{"a": 1}\n```') == {"a": 1}
|
||||
|
||||
def test_virgule_manquante_recuperee(self):
|
||||
raw = '[\n{"a": 1}\n{"b": 2}\n]'
|
||||
result = parse_json_output(raw)
|
||||
assert result == [{"a": 1}, {"b": 2}]
|
||||
|
||||
def test_boucle_tronquée_fermée(self):
|
||||
objs = ['{"code":"","position":"","libelle":""}'] * 10
|
||||
raw = '{"das": [\n' + ',\n'.join(objs) # non fermé
|
||||
result = parse_json_output(raw)
|
||||
assert isinstance(result, dict)
|
||||
assert "das" in result
|
||||
# Après troncature, 2 objets vides max, puis JSON refermé
|
||||
assert result.get("_truncated_loop") is True
|
||||
|
||||
def test_fallback_retourne_raw(self):
|
||||
"""Quand rien ne marche, on renvoie un dict avec _raw + _parse_error."""
|
||||
raw = "ceci n'est pas du JSON du tout !"
|
||||
result = parse_json_output(raw)
|
||||
assert result.get("_raw") == raw
|
||||
assert "_parse_error" in result
|
||||
145
tests/test_recueil.py
Normal file
145
tests/test_recueil.py
Normal file
@@ -0,0 +1,145 @@
|
||||
"""Tests unitaires pour pipeline.recueil (logique métier de la page recueil).
|
||||
|
||||
Les fonctions testées ici sont toutes pures (pas d'appel au VLM) :
|
||||
- filter_cim10_codes
|
||||
- classify_codes_dp_dr_das
|
||||
- merge_codage_reco
|
||||
- resolve_recueil_zones (juste lecture de config)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pipeline.recueil import (
|
||||
classify_codes_dp_dr_das,
|
||||
filter_cim10_codes,
|
||||
merge_codage_reco,
|
||||
resolve_recueil_zones,
|
||||
)
|
||||
|
||||
|
||||
class TestFilterCim10Codes:
|
||||
def test_codes_valides_conservés(self):
|
||||
codes = [
|
||||
{"code": "K650", "position": "1"},
|
||||
{"code": "T814", "position": "2"},
|
||||
{"code": "Z954 *", "position": "3"},
|
||||
]
|
||||
out = filter_cim10_codes(codes)
|
||||
assert len(out) == 3
|
||||
assert out[0]["code"] == "K650"
|
||||
|
||||
def test_ccam_rejeté(self):
|
||||
"""Un code CCAM (4 lettres + 3 chiffres) ne doit pas passer le filtre CIM-10."""
|
||||
codes = [
|
||||
{"code": "K650", "position": ""},
|
||||
{"code": "EBFA012", "position": "1"}, # CCAM
|
||||
]
|
||||
out = filter_cim10_codes(codes)
|
||||
assert len(out) == 1
|
||||
assert out[0]["code"] == "K650"
|
||||
|
||||
def test_code_vide_rejeté(self):
|
||||
codes = [{"code": "", "position": ""}, {"code": "K650", "position": ""}]
|
||||
out = filter_cim10_codes(codes)
|
||||
assert len(out) == 1
|
||||
|
||||
def test_non_dict_ignoré(self):
|
||||
codes = ["K650", None, {"code": "T814", "position": ""}]
|
||||
out = filter_cim10_codes(codes)
|
||||
assert len(out) == 1
|
||||
|
||||
def test_liste_vide(self):
|
||||
assert filter_cim10_codes([]) == []
|
||||
assert filter_cim10_codes(None) == []
|
||||
|
||||
|
||||
class TestClassifyCodesDpDrDas:
|
||||
def test_cas_nominal(self):
|
||||
"""1er sans position = DP, 2e sans position = DR, puis DAS avec positions."""
|
||||
codes = [
|
||||
{"code": "K650", "position": ""},
|
||||
{"code": "T814", "position": ""},
|
||||
{"code": "Z954", "position": "2"},
|
||||
{"code": "R33", "position": "3"},
|
||||
]
|
||||
dp, dr, das = classify_codes_dp_dr_das(codes)
|
||||
assert dp == "K650"
|
||||
assert dr == "T814"
|
||||
assert [d["code"] for d in das] == ["Z954", "R33"]
|
||||
|
||||
def test_dr_vide_non_duplique_dp(self):
|
||||
"""Quand Qwen duplique le DP (parce que DR est visuellement vide),
|
||||
on doit considérer que DR est vide, pas DR = DP."""
|
||||
codes = [
|
||||
{"code": "K650", "position": ""},
|
||||
{"code": "K650", "position": ""}, # doublon
|
||||
{"code": "T814", "position": "2"},
|
||||
]
|
||||
dp, dr, das = classify_codes_dp_dr_das(codes)
|
||||
assert dp == "K650"
|
||||
assert dr == "" # dédupliqué
|
||||
assert len(das) == 1
|
||||
|
||||
def test_seulement_dp(self):
|
||||
codes = [{"code": "K650", "position": ""}]
|
||||
dp, dr, das = classify_codes_dp_dr_das(codes)
|
||||
assert dp == "K650"
|
||||
assert dr == ""
|
||||
assert das == []
|
||||
|
||||
def test_tous_avec_positions(self):
|
||||
"""Si tous les codes ont une position, DP et DR sont vides, tout en DAS."""
|
||||
codes = [
|
||||
{"code": "K650", "position": "1"},
|
||||
{"code": "T814", "position": "2"},
|
||||
]
|
||||
dp, dr, das = classify_codes_dp_dr_das(codes)
|
||||
assert dp == ""
|
||||
assert dr == ""
|
||||
assert len(das) == 2
|
||||
|
||||
def test_vide(self):
|
||||
dp, dr, das = classify_codes_dp_dr_das([])
|
||||
assert (dp, dr, das) == ("", "", [])
|
||||
|
||||
|
||||
class TestMergeCodageReco:
|
||||
def test_crop_prime_sur_passage_principal(self):
|
||||
parsed = {"codage_reco": {"dp": "", "dr": "", "das": []}}
|
||||
reco = {"dp": "K650", "dr": "T814",
|
||||
"das": [{"code": "Z954", "position": "2"}]}
|
||||
merge_codage_reco(parsed, reco)
|
||||
assert parsed["codage_reco"]["dp"] == "K650"
|
||||
assert parsed["codage_reco"]["dr"] == "T814"
|
||||
assert len(parsed["codage_reco"]["das"]) == 1
|
||||
|
||||
def test_crop_vide_garde_passage_principal(self):
|
||||
"""Si le crop a un champ vide mais le passage principal l'avait rempli,
|
||||
on ne dégrade pas : on garde le passage principal."""
|
||||
parsed = {"codage_reco": {"dp": "K650", "dr": "", "das": []}}
|
||||
reco = {"dp": "", "dr": "", "das": []}
|
||||
merge_codage_reco(parsed, reco)
|
||||
assert parsed["codage_reco"]["dp"] == "K650" # préservé
|
||||
|
||||
def test_codage_reco_initialement_absent(self):
|
||||
parsed = {}
|
||||
reco = {"dp": "K650", "dr": "", "das": []}
|
||||
merge_codage_reco(parsed, reco)
|
||||
assert parsed["codage_reco"]["dp"] == "K650"
|
||||
|
||||
def test_trace_crop_ajoutee(self):
|
||||
parsed = {"codage_reco": {"dp": "", "dr": "", "das": []}}
|
||||
reco = {"dp": "K650", "_elapsed_s": 1.5}
|
||||
merge_codage_reco(parsed, reco)
|
||||
assert parsed["_crop_recodage"]["result"]["_elapsed_s"] == 1.5
|
||||
|
||||
|
||||
class TestResolveRecueilZones:
|
||||
def test_fallback_constantes(self):
|
||||
"""Sans config utilisateur, on a les zones par défaut."""
|
||||
reco, cb = resolve_recueil_zones()
|
||||
# 4 coords flottantes
|
||||
assert len(reco) == 4
|
||||
assert all(isinstance(v, float) for v in reco)
|
||||
# Checkbox zones
|
||||
assert len(cb.accord) == 4
|
||||
assert len(cb.desaccord) == 4
|
||||
118
tests/test_schema.py
Normal file
118
tests/test_schema.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""Tests unitaires pour pipeline.schema (nettoyage JSON)."""
|
||||
from __future__ import annotations
|
||||
|
||||
from pipeline.schema import (
|
||||
CLEAN_FIELDS_RECUEIL,
|
||||
DEBUG_FIELDS,
|
||||
SCHEMA_VERSION,
|
||||
clean_dossier,
|
||||
)
|
||||
|
||||
|
||||
def _sample_raw():
|
||||
"""Un JSON pipeline type, riche en champs debug."""
|
||||
return {
|
||||
"fichier": "OGC 7",
|
||||
"pdf_hash": "abc123",
|
||||
"pages": [{"page": 1, "type": "recueil"}],
|
||||
"extraction": {
|
||||
"recueil": {
|
||||
"etablissement": "CLINIQUE X",
|
||||
"finess": "330780206",
|
||||
"ghm_etab": "11M122",
|
||||
"ghs_etab": "4323",
|
||||
"codage_etab": {"dp": "K650"},
|
||||
"accord_desaccord": "accord",
|
||||
"_checkbox_debug": {"ratio_accord": 0.38, "ratio_desaccord": 0.19},
|
||||
"_parse_error": "whatever",
|
||||
"_truncated_loop": True,
|
||||
"_crop_recodage": {"dp": "K650", "_source": "crop"},
|
||||
"_validation": {
|
||||
"summary": {"valid": 3, "invalid": 0, "empty": 2, "total_codes": 3},
|
||||
"cross_checks": {
|
||||
"etab": {"checked": True, "coherent": True},
|
||||
"reco": {"checked": False, "reason": "ghm manquant"},
|
||||
},
|
||||
"codage_etab": {
|
||||
"dp": {"code": "K650", "valid": True, "libelle_ref": "Péritonite"},
|
||||
"dr": {"code": "", "valid": None},
|
||||
"das": [],
|
||||
},
|
||||
"codage_reco": {"dp": {}, "dr": {}, "das": []},
|
||||
"ghm_etab": {"code": "11M122", "valid": True,
|
||||
"ghs_possibles": ["4323"]},
|
||||
"ghs_etab": {"code": "4323", "valid": True},
|
||||
"ghm_reco": {"code": "", "valid": None},
|
||||
"ghs_reco": {"code": "", "valid": None},
|
||||
},
|
||||
},
|
||||
"concertation_2": {
|
||||
"ghs_initial": "4323",
|
||||
"ghs_final": "4323",
|
||||
"decision": "retour_groupage_dim",
|
||||
"date_concertation": "13/03/2018",
|
||||
},
|
||||
},
|
||||
"_meta": {"pipeline_version": "v2", "ocr_model": "Qwen/Qwen2.5-VL-3B-Instruct"},
|
||||
}
|
||||
|
||||
|
||||
class TestCleanDossier:
|
||||
def test_retourne_schema_version(self):
|
||||
out = clean_dossier(_sample_raw())
|
||||
assert out["schema_version"] == SCHEMA_VERSION
|
||||
|
||||
def test_retire_tous_les_champs_debug(self):
|
||||
"""Aucun champ de DEBUG_FIELDS ne doit rester dans la sortie clean."""
|
||||
out = clean_dossier(_sample_raw())
|
||||
rec = out["extraction"]["recueil"]
|
||||
for debug_field in DEBUG_FIELDS:
|
||||
assert debug_field not in rec, \
|
||||
f"{debug_field} devrait être retiré"
|
||||
|
||||
def test_garde_les_champs_metier(self):
|
||||
out = clean_dossier(_sample_raw())
|
||||
rec = out["extraction"]["recueil"]
|
||||
for f in ["etablissement", "finess", "ghm_etab", "ghs_etab",
|
||||
"codage_etab", "accord_desaccord"]:
|
||||
assert f in rec, f"{f} doit être présent dans clean"
|
||||
|
||||
def test_validation_compactee(self):
|
||||
"""La validation est conservée mais en format compact."""
|
||||
out = clean_dossier(_sample_raw())
|
||||
v = out["extraction"]["recueil"]["_validation"]
|
||||
# summary garde tel quel
|
||||
assert v["summary"]["valid"] == 3
|
||||
# cross_checks compactés : juste le coherent booléen (ou None)
|
||||
assert v["cross_checks"] == {
|
||||
"etab_ghm_ghs_coherent": True,
|
||||
"reco_ghm_ghs_coherent": None,
|
||||
}
|
||||
# Les codes validés gardent libelle_ref quand dispo
|
||||
assert v["codage_etab"]["dp"]["valid"] is True
|
||||
assert v["codage_etab"]["dp"].get("libelle_ref") == "Péritonite"
|
||||
|
||||
def test_concertation_2_conservee(self):
|
||||
out = clean_dossier(_sample_raw())
|
||||
c2 = out["extraction"]["concertation_2"]
|
||||
assert c2["ghs_initial"] == "4323"
|
||||
assert c2["decision"] == "retour_groupage_dim"
|
||||
|
||||
def test_champs_inconnus_ignorés(self):
|
||||
"""Un champ qui n'est pas dans CLEAN_FIELDS_RECUEIL est retiré."""
|
||||
raw = _sample_raw()
|
||||
raw["extraction"]["recueil"]["champ_inventé"] = "poubelle"
|
||||
out = clean_dossier(raw)
|
||||
assert "champ_inventé" not in out["extraction"]["recueil"]
|
||||
|
||||
def test_meta_preservee(self):
|
||||
out = clean_dossier(_sample_raw())
|
||||
assert out["_meta"]["pipeline_version"] == "v2"
|
||||
assert "Qwen" in out["_meta"]["ocr_model"]
|
||||
|
||||
def test_pas_de_modification_input(self):
|
||||
"""La fonction ne doit pas modifier l'input."""
|
||||
raw = _sample_raw()
|
||||
before = raw["extraction"]["recueil"].copy()
|
||||
_ = clean_dossier(raw)
|
||||
assert raw["extraction"]["recueil"] == before
|
||||
146
tests/test_validation.py
Normal file
146
tests/test_validation.py
Normal file
@@ -0,0 +1,146 @@
|
||||
"""Tests unitaires pour pipeline.validation."""
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from pipeline.validation import (
|
||||
_check_ccam,
|
||||
_check_cim10,
|
||||
_check_ghm,
|
||||
_check_ghs,
|
||||
_cross_check_ghm_ghs,
|
||||
annotate,
|
||||
validate_recueil,
|
||||
)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Vérifications par type de code
|
||||
# ============================================================
|
||||
|
||||
class TestCheckCim10:
|
||||
def test_code_valide(self):
|
||||
r = _check_cim10("K650")
|
||||
assert r["valid"] is True
|
||||
assert "libelle_ref" in r
|
||||
|
||||
def test_code_vide(self):
|
||||
assert _check_cim10("")["valid"] is None
|
||||
assert _check_cim10(None)["valid"] is None
|
||||
|
||||
def test_code_avec_suffixe_pmsi(self):
|
||||
# Les suffixes * et +N sont gérés par la normalisation
|
||||
r = _check_cim10("C795 *")
|
||||
assert r["valid"] is True
|
||||
|
||||
def test_code_invalide_avec_suggestion(self):
|
||||
# K65O (O au lieu de 0) n'existe pas, mais K650 oui
|
||||
r = _check_cim10("K65O")
|
||||
assert r["valid"] is False
|
||||
assert r.get("suggestion") == "K650"
|
||||
|
||||
def test_code_invalide_sans_suggestion(self):
|
||||
# Code farfelu sans voisin proche
|
||||
r = _check_cim10("ZZZZ9999")
|
||||
assert r["valid"] is False
|
||||
# suggestion peut être absente
|
||||
assert r.get("suggestion") is None or r.get("suggestion") != "ZZZZ9999"
|
||||
|
||||
|
||||
class TestCheckGhm:
|
||||
def test_ghm_valide(self):
|
||||
r = _check_ghm("11M122")
|
||||
assert r["valid"] is True
|
||||
assert isinstance(r.get("ghs_possibles"), list)
|
||||
assert len(r["ghs_possibles"]) > 0
|
||||
|
||||
def test_ghm_invalide(self):
|
||||
r = _check_ghm("99Z999")
|
||||
assert r["valid"] is False
|
||||
|
||||
|
||||
class TestCheckGhs:
|
||||
def test_ghs_valide(self):
|
||||
assert _check_ghs("4323")["valid"] is True
|
||||
|
||||
def test_ghs_invalide(self):
|
||||
assert _check_ghs("99999")["valid"] is False
|
||||
|
||||
|
||||
class TestCheckCcam:
|
||||
def test_ccam_valide(self):
|
||||
assert _check_ccam("EBFA012")["valid"] is True
|
||||
|
||||
def test_ccam_invalide(self):
|
||||
assert _check_ccam("XXXX000")["valid"] is False
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Cross-checks GHM ↔ GHS
|
||||
# ============================================================
|
||||
|
||||
class TestCrossCheckGhmGhs:
|
||||
def test_couple_coherent(self):
|
||||
# 11M122 a bien 4323 dans ses GHS possibles
|
||||
r = _cross_check_ghm_ghs("11M122", "4323")
|
||||
assert r["checked"] is True
|
||||
assert r["coherent"] is True
|
||||
|
||||
def test_couple_incoherent(self):
|
||||
# 11M122 ne correspond pas à n'importe quel GHS
|
||||
r = _cross_check_ghm_ghs("11M122", "9999")
|
||||
assert r["checked"] is True
|
||||
assert r["coherent"] is False
|
||||
|
||||
def test_ghm_manquant(self):
|
||||
r = _cross_check_ghm_ghs("", "4323")
|
||||
assert r["checked"] is False
|
||||
|
||||
def test_ghm_invalide(self):
|
||||
r = _cross_check_ghm_ghs("99Z999", "4323")
|
||||
assert r["checked"] is False
|
||||
assert "invalide" in r["reason"].lower()
|
||||
|
||||
|
||||
# ============================================================
|
||||
# annotate (intégration)
|
||||
# ============================================================
|
||||
|
||||
class TestAnnotate:
|
||||
def test_annotate_json_vide(self):
|
||||
out = annotate({"fichier": "TEST", "extraction": {}})
|
||||
assert "fichier" in out
|
||||
assert out["extraction"] == {}
|
||||
|
||||
def test_annotate_recueil_complet(self):
|
||||
raw = {
|
||||
"fichier": "TEST",
|
||||
"extraction": {
|
||||
"recueil": {
|
||||
"codage_etab": {"dp": "K650", "dr": "", "das": [
|
||||
{"code": "T814", "position": "2"},
|
||||
]},
|
||||
"codage_reco": {"dp": "", "dr": "", "das": []},
|
||||
"ghm_etab": "11M122",
|
||||
"ghs_etab": "4323",
|
||||
"ghm_reco": "",
|
||||
"ghs_reco": "",
|
||||
},
|
||||
},
|
||||
}
|
||||
out = annotate(raw)
|
||||
v = out["extraction"]["recueil"]["_validation"]
|
||||
assert v["codage_etab"]["dp"]["valid"] is True
|
||||
assert v["ghm_etab"]["valid"] is True
|
||||
assert v["cross_checks"]["etab"]["coherent"] is True
|
||||
assert v["summary"]["valid"] >= 3
|
||||
|
||||
def test_annotate_preserve_source(self):
|
||||
"""L'annotation ne doit pas modifier l'input (copie défensive)."""
|
||||
raw = {
|
||||
"fichier": "T",
|
||||
"extraction": {"recueil": {"codage_etab": {"dp": "K650"}}},
|
||||
}
|
||||
out = annotate(raw)
|
||||
assert "_validation" not in raw["extraction"]["recueil"]
|
||||
assert "_validation" in out["extraction"]["recueil"]
|
||||
85
tests/test_zones_config.py
Normal file
85
tests/test_zones_config.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Tests unitaires pour pipeline.zones_config."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from pipeline.zones_config import (
|
||||
DEFAULTS,
|
||||
get_zone,
|
||||
load_config,
|
||||
save_config,
|
||||
)
|
||||
|
||||
|
||||
class TestLoadConfig:
|
||||
def test_fichier_absent_retourne_defaults(self, tmp_path):
|
||||
cfg = load_config(tmp_path / "inexistant.json")
|
||||
assert cfg == DEFAULTS
|
||||
|
||||
def test_charge_depuis_fichier(self, tmp_path):
|
||||
path = tmp_path / "zones.json"
|
||||
custom = {
|
||||
"recueil": {
|
||||
"codage_reco": {"x1": 0.5, "y1": 0.1, "x2": 0.9, "y2": 0.4,
|
||||
"description": "test"},
|
||||
},
|
||||
}
|
||||
path.write_text(json.dumps(custom))
|
||||
cfg = load_config(path)
|
||||
assert cfg["recueil"]["codage_reco"]["x1"] == 0.5
|
||||
|
||||
def test_merge_avec_defaults(self, tmp_path):
|
||||
"""Les zones non définies dans le fichier tombent en défaut."""
|
||||
path = tmp_path / "zones.json"
|
||||
partial = {
|
||||
"recueil": {"codage_reco": {"x1": 0.1, "y1": 0.2, "x2": 0.3, "y2": 0.4}},
|
||||
}
|
||||
path.write_text(json.dumps(partial))
|
||||
cfg = load_config(path)
|
||||
# User override appliqué
|
||||
assert cfg["recueil"]["codage_reco"]["x1"] == 0.1
|
||||
# Default gardé pour l'autre zone
|
||||
assert cfg["recueil"]["accord_checkbox"] == DEFAULTS["recueil"]["accord_checkbox"]
|
||||
|
||||
def test_json_corrompu_retombe_sur_defaults(self, tmp_path):
|
||||
path = tmp_path / "corrupt.json"
|
||||
path.write_text("{ not valid json [")
|
||||
cfg = load_config(path)
|
||||
assert cfg == DEFAULTS
|
||||
|
||||
|
||||
class TestSaveConfig:
|
||||
def test_save_puis_load_round_trip(self, tmp_path):
|
||||
path = tmp_path / "zones.json"
|
||||
original = {
|
||||
"recueil": {
|
||||
"codage_reco": {"x1": 0.11, "y1": 0.22, "x2": 0.33, "y2": 0.44,
|
||||
"description": "abc"},
|
||||
},
|
||||
}
|
||||
save_config(original, path)
|
||||
reloaded = load_config(path)
|
||||
assert reloaded["recueil"]["codage_reco"]["x1"] == 0.11
|
||||
assert reloaded["recueil"]["codage_reco"]["description"] == "abc"
|
||||
|
||||
|
||||
class TestGetZone:
|
||||
def test_zone_existante(self):
|
||||
z = get_zone("recueil", "codage_reco")
|
||||
assert isinstance(z, tuple)
|
||||
assert len(z) == 4
|
||||
assert all(isinstance(v, float) for v in z)
|
||||
|
||||
def test_zone_inconnue_retourne_none(self):
|
||||
assert get_zone("recueil", "zone_qui_nexiste_pas") is None
|
||||
assert get_zone("page_fantaisiste", "whatever") is None
|
||||
|
||||
def test_config_explicite(self):
|
||||
cfg = {
|
||||
"recueil": {
|
||||
"my_zone": {"x1": 0.0, "y1": 0.0, "x2": 1.0, "y2": 1.0},
|
||||
},
|
||||
}
|
||||
assert get_zone("recueil", "my_zone", config=cfg) == (0.0, 0.0, 1.0, 1.0)
|
||||
Reference in New Issue
Block a user