Compare commits

...

6 Commits

Author SHA1 Message Date
Dom
e55daf275e fix(ui): shim de compatibilité pour streamlit-drawable-canvas 0.9.3
streamlit-drawable-canvas 0.9.3 (dernière version disponible sur PyPI)
utilise l'API privée `streamlit.elements.image.image_to_url` qui a été
retirée à partir de Streamlit ≈ 1.49. Sur Streamlit 1.56 (installé ici),
le canvas plante à l'ouverture du mode "🔧 Calibration zones" :

  AttributeError: module 'streamlit.elements.image' has no attribute 'image_to_url'

Plutôt que de downgrader Streamlit globalement (impact sur les autres
features de l'overlay), on injecte une implémentation locale de
`image_to_url` au tout début de pipeline/ui_overlay.py si elle est absente.
L'implémentation produit un data URI base64 que le canvas consomme
directement côté navigateur, sans toucher au système de fichiers media.

À retirer dès qu'une version > 0.9.3 de streamlit-drawable-canvas
publiera un correctif officiel.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 09:52:19 +02:00
Dom
3a87751444 test: couvrir les modules purs du pipeline (96 nouveaux tests)
Suite de tests unitaires pour tous les modules pipeline qui ne dépendent
pas du VLM — utiles pour garantir la non-régression après refactor et
servir de spec vivante de chaque fonction.

Fichiers :
- tests/test_json_utils.py   (20 tests) : parse_json_output + toutes les
  stratégies de récupération (fences, virgules manquantes, boucles vides,
  fermeture JSON, fallback _raw/_parse_error)
- tests/test_deskew.py       (11 tests) : détection Hough + correction,
  image synthétique + fixtures cache réel
- tests/test_checkboxes.py   (17 tests) : parse_ghs_injustifie,
  dark_ratio, inner_frac, et ground truth visuel sur 17 dossiers
  (mapping hash→OGC résolu au runtime pour éviter les constantes fragiles)
- tests/test_validation.py   (18 tests) : _check_cim10/ccam/ghm/ghs,
  cross-checks GHM↔GHS, annotate sur JSON vide et complet,
  preservation de l'input (copie défensive)
- tests/test_schema.py       (8 tests)  : clean_dossier retire les champs
  debug, préserve les champs métier, compacte la validation, ne modifie
  pas l'input
- tests/test_zones_config.py (8 tests)  : load/save round-trip, merge
  avec defaults, résilience JSON corrompu, get_zone

Total : 107 tests, 5.1 s d'exécution, tous passent. Aucune dépendance
GPU, s'exécutent en CI.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 23:29:23 +02:00
Dom
d326524e49 refactor(extract): décomposer en étages testables (json_utils + recueil)
extract.py contenait 4 responsabilités mélangées (320 lignes) :
parsing JSON tolérant, résolution de zones, crop Recodage avec classification
métier, orchestration. Séparation en modules cohérents :

- pipeline/json_utils.py : parsing tolérant réutilisable (strip fences,
  virgules manquantes, troncature des boucles d'objets vides, fermeture
  des structures JSON ouvertes). N'a aucune connaissance métier OGC.

- pipeline/recueil.py    : toute la logique spécifique à la page recueil —
  résolution de zones configurables, filter_cim10_codes, classification
  DP/DR/DAS par règle métier, run_recodage_crop_pass, merge_codage_reco,
  enrich_recueil (orchestration des trois : checkboxes + ghs_injustifie
  + crop Recodage). Chaque fonction est testable indépendamment du VLM.

- pipeline/extract.py    : réduit à l'orchestration pure — ingest, routing,
  boucle page par page, délégation à recueil.enrich_recueil, validation
  ATIH finale. Plus aucune logique métier enfouie.

La fonction extract_dossier garde exactement la même signature et produit
le même JSON en sortie : aucun breaking change externe.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 23:29:03 +02:00
Dom
1255468676 feat(ui): calibration visuelle des zones via dessin à la souris
Nouveau module pipeline/zones_config.py : charge les zones d'extraction
depuis un fichier zones_config.json (coordonnées relatives 0-1), avec
fallback sur les constantes Python. Config partagée entre :
- pipeline/extract.py (crop colonne Recodage)
- pipeline/checkboxes.py (cases Accord/Désaccord)

Zones configurables aujourd'hui (page recueil) :
- codage_reco (crop zonal pour le second passage VLM)
- accord_checkbox / desaccord_checkbox (densité de pixels)

Mode "🔧 Calibration zones" ajouté dans pipeline/ui_overlay.py :
- Sélection d'un PDF de référence (idéalement bien cadré)
- Canvas interactif (streamlit-drawable-canvas) avec les zones
  existantes pré-dessinées en rouge
- Dessin/déplacement/redimensionnement à la souris
- Saisie d'un nom et description par zone
- Sauvegarde en JSON (ou OGC_ZONES_CONFIG si défini)

Permet au métier (Khalid) de recalibrer les zones sans toucher au code,
par exemple si le formulaire ATIH évolue ou si les scans sont d'un autre
établissement.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 23:07:59 +02:00
Dom
c0b0cd9b87 perf(ocr_qwen): support CPU + bfloat16 AVX-512 + threads explicites
Trois ajouts pour rendre le pipeline utilisable sur CPU quand la VRAM
est saturée par d'autres process :

1. Variable QWEN_DEVICE=cpu pour forcer le device CPU. Le défaut "auto"
   choisit CUDA si dispo, fallback CPU sinon.

2. Sur CPU, détection automatique du support AVX-512 BF16 via /proc/cpuinfo
   (Zen 4/5, Intel Sapphire Rapids+). Si présent, bfloat16 au lieu de
   float32 — divise par 2 la RAM et ~2x plus rapide sur matmul.

3. Appel explicite de torch.set_num_threads(N) et set_num_interop_threads(N)
   (OMP_NUM_THREADS seul ne suffit pas). Configurable via TORCH_NUM_THREADS,
   défaut = os.cpu_count().

Mesure sur Ryzen 9 9950X (Zen 5, 16c/32t, AVX-512 BF16 natif) :
- AVANT : 645% CPU (~6.5 cores), 15 Go RAM (float32)
- APRÈS : 2433% CPU (~24 cores), 8 Go RAM (bfloat16)

Appel `torch.cuda.empty_cache()` en fin d'inférence pour réduire la
fragmentation VRAM quand d'autres process GPU tournent en parallèle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 23:07:45 +02:00
Dom
6c8184cc03 feat(deskew): correction automatique du skew au chargement des PDFs
Nouveau module pipeline/deskew.py basé sur cv2.HoughLinesP :
- détecte les lignes quasi-horizontales (±15° de l'horizontale)
- prend la médiane de leurs angles (robuste aux outliers)
- seuils : |angle|>0.3° pour corriger, |angle|>10° = suspect (on ne corrige pas)
- PIL.rotate() avec BICUBIC + fillcolor blanc, sans expand

Intégré dans pipeline/ingest.py (paramètre `deskew=True` par défaut).
L'angle appliqué est tracé dans un fichier `page_XX.skew` à côté de
l'image, pour audit.

Mesuré sur les 18 dossiers de l'échantillon 2018 CARC : seule OGC 1 a
un skew au-dessus du seuil (+0.91°), les 17 autres sont déjà droits.
Le deskew corrige OGC 1 en 0.00° résiduel (vérif visuelle en-tête OK).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 23:07:29 +02:00
15 changed files with 1782 additions and 249 deletions

125
pipeline/deskew.py Normal file
View File

@@ -0,0 +1,125 @@
"""Détection d'angle de skew + redressement automatique des pages scannées.
Technique : Hough Transform sur les lignes détectées par Canny, puis moyenne
des angles des lignes « quasi horizontales » (±15° par rapport à l'horizontale).
Les fiches OGC ont énormément de traits de tableau → signal très fort.
Seuil : on ne corrige que si |angle| > `MIN_ANGLE_DEG` (0.3° par défaut) pour
éviter de toucher les scans déjà bien cadrés et introduire du bruit inutile.
"""
from __future__ import annotations
from pathlib import Path
from typing import Tuple
import numpy as np
from PIL import Image
try:
import cv2 # type: ignore
_HAS_CV2 = True
except ImportError:
_HAS_CV2 = False
MIN_ANGLE_DEG = 0.3 # en-dessous, on ne corrige pas
MAX_ANGLE_DEG = 10.0 # au-dessus, c'est anormal → suspect, on ne corrige pas
NEAR_HORIZONTAL_BAND = 15.0 # degrés : bande autour de l'horizontale pour filtrer
def detect_skew_angle(img: Image.Image) -> float:
"""Retourne l'angle de skew en degrés (positif = tourné dans le sens
des aiguilles d'une montre) à appliquer pour redresser l'image.
Si aucune ligne horizontale n'est trouvée, retourne 0.0.
Si l'angle détecté est hors [-MAX_ANGLE_DEG, +MAX_ANGLE_DEG], retourne 0.0
(probablement une erreur de détection, on ne corrige pas).
"""
if not _HAS_CV2:
return 0.0
gray = np.array(img.convert("L"))
# Réduire l'image pour accélérer (max 1500 px de large)
h, w = gray.shape
if w > 1500:
scale = 1500 / w
gray = cv2.resize(gray, (1500, int(h * scale)), interpolation=cv2.INTER_AREA)
# Canny edges — paramètres standards documents
edges = cv2.Canny(gray, 50, 150, apertureSize=3)
# Hough Lines probabiliste : rapide et robuste
lines = cv2.HoughLinesP(
edges, rho=1, theta=np.pi / 180, threshold=200,
minLineLength=gray.shape[1] // 4, # au moins 25% de la largeur
maxLineGap=20,
)
if lines is None or len(lines) == 0:
return 0.0
# Calculer l'angle de chaque ligne en degrés
angles = []
for line in lines:
x1, y1, x2, y2 = line[0]
if x2 == x1:
continue # ligne verticale, ignorée
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
# On ne garde que les lignes proches de l'horizontale
if abs(angle) < NEAR_HORIZONTAL_BAND:
angles.append(angle)
if not angles:
return 0.0
# Moyenne robuste : médiane plutôt que mean, moins sensible aux outliers
angle = float(np.median(angles))
if abs(angle) > MAX_ANGLE_DEG:
return 0.0 # suspect → on ne corrige pas
return angle
def deskew_image(img: Image.Image,
angle: float | None = None,
min_angle: float = MIN_ANGLE_DEG) -> Tuple[Image.Image, float]:
"""Redresse une image si le skew détecté dépasse `min_angle`.
Retourne (image_eventuellement_rotee, angle_applique).
Si |angle| < min_angle, retourne l'image inchangée et angle=0.0.
"""
if angle is None:
angle = detect_skew_angle(img)
if abs(angle) < min_angle:
return img, 0.0
# PIL.Image.rotate : positive angle = counter-clockwise
# detect_skew retourne positif = clockwise → on inverse pour PIL
rotated = img.rotate(
angle,
resample=Image.Resampling.BICUBIC,
expand=False,
fillcolor="white",
)
return rotated, angle
def deskew_file(src: Path, dst: Path | None = None,
min_angle: float = MIN_ANGLE_DEG) -> float:
"""Version fichier → fichier. Écrase `src` si `dst` est None.
Retourne l'angle appliqué (0.0 si pas de rotation)."""
img = Image.open(src)
rotated, angle = deskew_image(img, min_angle=min_angle)
out = dst or src
rotated.save(out, "PNG", optimize=True)
return angle
if __name__ == "__main__":
import sys
import glob
paths = [Path(p) for p in (sys.argv[1:] or sorted(glob.glob(".cache/images/*/page_01.png")))]
print(f"Deskew sur {len(paths)} images (seuil={MIN_ANGLE_DEG}°)...")
total_corrected = 0
for p in paths:
angle = detect_skew_angle(Image.open(p))
mark = "" if abs(angle) >= MIN_ANGLE_DEG else "·"
if abs(angle) >= MIN_ANGLE_DEG:
total_corrected += 1
print(f" {mark} {p} : {angle:+.2f}°")
print(f"\n{total_corrected}/{len(paths)} images auraient besoin d'un redressement.")

View File

@@ -1,211 +1,84 @@
"""Orchestration d'extraction pour un dossier OGC."""
import json
import re
"""Orchestration d'extraction pour un dossier OGC.
Chaîne les étages du pipeline sans connaître leur implémentation interne :
ingest → routing → OCR page par page → enrichissement page-spécifique → validation ATIH
L'orchestration elle-même ne contient aucune logique métier : elle délègue à
`pipeline.recueil`, `pipeline.validation`, `pipeline.classify`, `pipeline.ocr_qwen`
et `pipeline.prompts`. Cela permet de tester indépendamment chaque étage.
"""
from __future__ import annotations
import time
from pathlib import Path
from PIL import Image
from .ingest import pdf_to_images
from .classify import detect_page_type, route_by_index
from .ingest import pdf_to_images
from .json_utils import parse_json_output
from .ocr_qwen import QwenVLOCR
from .prompts import (
PAGE_TYPES, PROMPT_HEADER,
SCHEMA_RECUEIL_RECODAGE, RECUEIL_RECODAGE_ZONE,
)
from .checkboxes import detect_accord_desaccord, RECUEIL_ACCORD_DESACCORD, parse_ghs_injustifie
from .prompts import PAGE_TYPES, PROMPT_HEADER
from .recueil import enrich_recueil, resolve_recueil_zones
from .validation import annotate as validate_annotate
_EMPTY_OBJ_PATTERN = re.compile(
r'\{\s*"code"\s*:\s*""\s*,\s*"position"\s*:\s*""\s*(?:,\s*"libelle"\s*:\s*""\s*)?\}',
re.DOTALL,
)
def _run_page_ocr(ocr: QwenVLOCR, image_path: Path, ptype: str) -> tuple[dict | None, str, float]:
"""Exécute le prompt principal associé à un type de page et parse le JSON.
def _truncate_empty_loop(text: str, max_consecutive: int = 2) -> str:
"""Détecte et tronque les boucles d'objets vides.
GLM-OCR peut boucler sur `{"code":"", "position":"", "libelle":""}` quand
un tableau DAS ou actes est vide dans l'image. La sortie est alors
tronquée à `max_new_tokens` sans fermer le JSON → parse error.
On garde au plus `max_consecutive` objets vides puis on coupe.
Retourne (parsed_dict_ou_None, ocr_raw, elapsed_s). `parsed=None` quand
la page n'a pas de prompt structuré associé (concertation_med, hospit.).
"""
matches = list(_EMPTY_OBJ_PATTERN.finditer(text))
if len(matches) <= max_consecutive:
return text
# On coupe après la fin du `max_consecutive`-ième match
cut_at = matches[max_consecutive - 1].end()
return text[:cut_at]
def _close_open_json(text: str) -> str:
"""Ajoute les brackets/braces manquants pour tenter de fermer un JSON tronqué."""
# Compte les brackets non balancés en ignorant ceux entre guillemets simples/doubles
depth_brace = 0
depth_bracket = 0
in_string = False
escape = False
for c in text:
if escape:
escape = False
continue
if c == "\\":
escape = True
continue
if c == '"':
in_string = not in_string
continue
if in_string:
continue
if c == "{": depth_brace += 1
elif c == "}": depth_brace -= 1
elif c == "[": depth_bracket += 1
elif c == "]": depth_bracket -= 1
# Retirer les virgules traînantes
closed = text.rstrip().rstrip(",")
# Fermer en priorité les crochets ouverts (tableaux), puis les accolades
closed += "]" * max(0, depth_bracket)
closed += "}" * max(0, depth_brace)
return closed
def parse_json_output(raw: str) -> dict | None:
"""Tente d'extraire un JSON depuis la sortie GLM-OCR.
Stratégies successives :
1. parse direct après retrait des fences ```json
2. patch des virgules manquantes entre objets / tableaux
3. détection et troncature des boucles d'objets vides (cas fréquent sur
tableaux DAS/actes vides → boucle jusqu'à max_new_tokens)
4. fermeture des structures JSON ouvertes après troncature
"""
if not raw:
return None
text = raw.strip()
# 1) fences markdown
text = re.sub(r"^```(?:json)?\s*", "", text)
text = re.sub(r"\s*```$", "", text)
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# 2) virgules manquantes entre `} {` et `] [`
patched = re.sub(r"\}\s*\n(\s*\{)", r"},\n\1", text)
patched = re.sub(r"\]\s*\n(\s*\[)", r"],\n\1", patched)
try:
return json.loads(patched)
except json.JSONDecodeError:
pass
# 3) troncature des boucles d'objets vides puis 4) fermeture
trimmed = _truncate_empty_loop(patched)
closed = _close_open_json(trimmed)
try:
result = json.loads(closed)
result["_truncated_loop"] = True # trace de l'intervention
return result
except json.JSONDecodeError as e:
return {"_raw": raw, "_parse_error": str(e)}
def _extract_recodage_crop(image_path: Path, ocr: QwenVLOCR) -> dict | None:
"""Second passage VLM sur le crop zonal de la colonne Recodage.
Qwen nous renvoie la liste brute de tous les codes visibles (avec position
si présente). On classifie DP/DR/DAS en Python par règles :
- les 1ᵉʳ et 2ᵉ codes SANS position → DP puis DR (DR peut être vide si
le 2ᵉ code a déjà une position).
- tous les codes AVEC position → DAS.
Retourne un dict {dp, dr, das[]} ou None en cas d'échec.
"""
try:
img = Image.open(image_path)
w, h = img.size
x1, y1, x2, y2 = RECUEIL_RECODAGE_ZONE
crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
crop_path = image_path.parent / f"{image_path.stem}_recodage.png"
crop.save(crop_path)
except Exception:
return None
res = ocr.run(crop_path, SCHEMA_RECUEIL_RECODAGE, max_new_tokens=1024)
conf = PAGE_TYPES.get(ptype)
if not conf or conf["prompt"] == PROMPT_HEADER:
return None, "", 0.0
res = ocr.run(image_path, conf["prompt"], max_new_tokens=4096)
parsed = parse_json_output(res["text"])
if not isinstance(parsed, dict) or "_parse_error" in parsed:
return None
# Filtrer : ne garder que les codes au format CIM-10. Si le crop dépasse
# malgré tout dans la zone Actes, les CCAM (4 lettres + 3 chiffres) seront
# exclus ici.
cim10_re = re.compile(r"^[A-Z]\d{2,4}\s*\*?\s*\+?\d*$")
codes_raw = parsed.get("codes") or []
codes = []
for c in codes_raw:
if not isinstance(c, dict): continue
code = (c.get("code") or "").strip()
if code and cim10_re.match(code):
codes.append({
"code": code,
"position": str(c.get("position") or "").strip(),
})
# Classifier par règle métier :
# - 1er code sans position → DP
# - 2e code sans position → DR (sauf s'il est identique au DP : Qwen tend
# à dupliquer le DP quand DR est vide — on préfère DR="")
# - codes avec position → DAS
dp, dr = "", ""
das = []
dp_assigned = dr_assigned = False
for c in codes:
code, position = c["code"], c["position"]
if not position:
if not dp_assigned:
dp, dp_assigned = code, True
elif not dr_assigned:
if code == dp:
# doublon du DP → on considère que DR est vide
dr_assigned = True
else:
dr, dr_assigned = code, True
else:
das.append({"code": code, "position": ""})
else:
das.append(c)
return {
"dp": dp, "dr": dr, "das": das,
"_source": "crop_recodage",
"_elapsed_s": round(res["elapsed_s"], 2),
"_n_codes_raw": len(codes_raw),
"_n_codes_kept": len(codes),
}
return parsed, res["text"], round(res["elapsed_s"], 2)
def _merge_codage_reco(parsed: dict, reco: dict) -> None:
"""Fusionne le résultat du crop Recodage dans parsed["codage_reco"].
def _resolve_routing(images: list[Path], ocr: QwenVLOCR,
use_standard_routing: bool,
verbose: bool) -> tuple[list[str | None], list[str]]:
"""Détermine le type de chaque page, soit par ordre standard (une seule
vérification sur la page 1), soit par classification OCR page par page.
Politique : le crop est plus fiable (contexte isolé), il prime sur le
passage principal SAUF si le crop laisse vide un champ que le principal
avait bien lu.
Retourne (page_types, headers). `headers[i]` est vide si pas de classify
effectuée sur la page i.
"""
existing = parsed.get("codage_reco") if isinstance(parsed.get("codage_reco"), dict) else {}
merged = {
"dp": reco.get("dp", "") or existing.get("dp", ""),
"dr": reco.get("dr", "") or existing.get("dr", ""),
"das": reco.get("das") or existing.get("das") or [],
}
parsed["codage_reco"] = merged
parsed.setdefault("_crop_recodage", {})["result"] = reco
page_types: list[str | None] = [None] * len(images)
headers: list[str] = [""] * len(images)
if use_standard_routing and images:
ptype1, header1 = detect_page_type(images[0], ocr)
if ptype1 == "recueil":
page_types = list(route_by_index(len(images)))
headers[0] = header1
if verbose:
print(" routing standard (page 1 = recueil OK)")
return page_types, headers
if verbose:
print(f" page 1 = {ptype1} → fallback classification")
# Fallback : classify page par page
for i, img in enumerate(images):
page_types[i], headers[i] = detect_page_type(img, ocr)
return page_types, headers
def extract_dossier(pdf_path: str | Path, verbose: bool = True,
use_standard_routing: bool = True) -> dict:
"""Pipeline complet d'un dossier : PDF → JSON structuré.
"""Pipeline complet d'un dossier : PDF → JSON structuré + annoté ATIH.
use_standard_routing=True (défaut) : route les pages par index selon
l'ordre standard OGC (6 pages), sans OCR de classification. -50% du temps.
Vérifie uniquement la page 1 pour s'assurer qu'on commence bien par
"recueil" — si non, bascule en classification complète (fallback).
Étages :
1. `ingest.pdf_to_images` : PDF → PNG 300 dpi (avec deskew auto, cache)
2. `_resolve_routing` : type de chaque page
3. `_run_page_ocr` : OCR du schéma structuré par type de page
4. `recueil.enrich_recueil` : checkboxes + crop Recodage pour la page recueil
5. `validation.annotate` : validation ATIH de tous les codes extraits
Paramètre `use_standard_routing=True` exploite l'ordre standard des 6 pages
OGC et économise 5 appels OCR par dossier. Bascule automatique sur la
classification page-à-page si la page 1 n'est pas le recueil attendu.
"""
pdf_path = Path(pdf_path)
ocr = QwenVLOCR()
@@ -216,37 +89,22 @@ def extract_dossier(pdf_path: str | Path, verbose: bool = True,
if verbose:
print(f"[{pdf_path.name}] {len(images)} pages converties")
# Choix de stratégie de routing
page_types = [None] * len(images)
headers = [""] * len(images)
if use_standard_routing:
# Vérif rapide sur la page 1 (seul OCR de classification)
ptype1, header1 = detect_page_type(images[0], ocr)
if ptype1 == "recueil":
page_types = route_by_index(len(images))
headers[0] = header1
if verbose:
print(f" routing standard (page 1 = recueil OK)")
else:
if verbose:
print(f" page 1 = {ptype1} → fallback classification")
use_standard_routing = False
page_types, headers = _resolve_routing(images, ocr, use_standard_routing, verbose)
result = {
_, cb_zones = resolve_recueil_zones()
result: dict = {
"fichier": pdf_path.stem,
"pdf_hash": images[0].parent.name,
"pdf_hash": images[0].parent.name if images else "",
"pages": [],
"extraction": {},
}
for idx, img_path in enumerate(images, 1):
t0 = time.time()
if use_standard_routing:
ptype = page_types[idx - 1]
header_text = headers[idx - 1]
else:
ptype, header_text = detect_page_type(img_path, ocr)
page_info = {
ptype = page_types[idx - 1]
header_text = headers[idx - 1]
page_info: dict = {
"page": idx,
"type": ptype,
"header": header_text.strip(),
@@ -255,42 +113,20 @@ def extract_dossier(pdf_path: str | Path, verbose: bool = True,
if verbose:
print(f" p{idx}: {ptype}")
prompt_conf = PAGE_TYPES.get(ptype)
if prompt_conf and prompt_conf["prompt"] != PROMPT_HEADER:
res = ocr.run(img_path, prompt_conf["prompt"], max_new_tokens=4096)
parsed = parse_json_output(res["text"])
page_info["ocr_raw"] = res["text"]
parsed, ocr_raw, elapsed = _run_page_ocr(ocr, img_path, ptype) if ptype else (None, "", 0.0)
if parsed is not None:
page_info["ocr_raw"] = ocr_raw
page_info["parsed"] = parsed
page_info["elapsed_s"] = round(res["elapsed_s"], 2)
page_info["elapsed_s"] = elapsed
# Enrichissement : checkboxes + normalisation champs booléens
# sur la fiche recueil. GLM-OCR / Qwen ne lisent pas les cases
# à cocher (cf. scratch/test_prompt_crop_v2.py).
if ptype == "recueil" and isinstance(parsed, dict):
cb = detect_accord_desaccord(img_path, RECUEIL_ACCORD_DESACCORD)
parsed["accord_desaccord"] = cb["decision"]
parsed["_checkbox_debug"] = cb # ratios + diff pour audit
# ghs_injustifie : Qwen renvoie parfois "0 SE 1 2 3 4 ATU FFM FSD"
# → ne garder que le chiffre 0/1 de tête
parsed["ghs_injustifie"] = parse_ghs_injustifie(parsed.get("ghs_injustifie", ""))
# Second passage : crop de la colonne Recodage pour compenser
# la sous-extraction observée sur codage_reco.* en passage principal.
reco = _extract_recodage_crop(img_path, ocr)
if reco:
_merge_codage_reco(parsed, reco)
enrich_recueil(parsed, img_path, ocr, cb_zones)
page_info["parsed"] = parsed
# Indexer par type pour accès direct dans result["extraction"]
result["extraction"][ptype] = parsed
else:
# Pages non structurées : juste l'en-tête déjà OCR
page_info["elapsed_s"] = round(time.time() - t0, 2)
result["pages"].append(page_info)
# Post-traitement : validation ATIH de tous les codes extraits
result = validate_annotate(result)
return result
return validate_annotate(result)

View File

@@ -1,10 +1,16 @@
"""PDF → images PNG 300 dpi avec cache par hash SHA256."""
"""PDF → images PNG 300 dpi avec cache par hash SHA256.
Applique optionnellement un deskew automatique (redressement) sur chaque page
pour corriger le biais d'inclinaison des scans. Voir pipeline/deskew.py.
"""
import hashlib
import os
from pathlib import Path
from pdf2image import convert_from_path
from PIL import Image
from .deskew import deskew_image, MIN_ANGLE_DEG
DEFAULT_DPI = 300
CACHE_ROOT = Path(".cache/images")
@@ -18,23 +24,37 @@ def pdf_hash(pdf_path: str) -> str:
return h.hexdigest()[:16]
def pdf_to_images(pdf_path: str, dpi: int = DEFAULT_DPI, cache_root: Path = CACHE_ROOT) -> list[Path]:
def pdf_to_images(pdf_path: str, dpi: int = DEFAULT_DPI,
cache_root: Path = CACHE_ROOT,
deskew: bool = True) -> list[Path]:
"""Convertit un PDF en PNG 300 dpi. Retourne la liste des chemins (1 par page).
Le cache est indexé par hash du PDF : un PDF inchangé n'est jamais reconverti.
Avec `deskew=True` (défaut), chaque page est redressée si son angle de skew
dépasse le seuil défini dans `pipeline.deskew.MIN_ANGLE_DEG` (0.3°). L'angle
appliqué est persisté dans un fichier `<page>.skew` à côté (pour audit).
"""
cache_root = Path(cache_root)
h = pdf_hash(pdf_path)
out_dir = cache_root / h
out_dir.mkdir(parents=True, exist_ok=True)
existing = sorted(out_dir.glob("page_*.png"))
# Le glob est strict pour ne pas attraper les crops intermédiaires
# (page_XX_recodage.png, etc.)
existing = sorted(p for p in out_dir.glob("page_*.png")
if p.stem.replace("page_", "").isdigit())
if existing:
return existing
pages = convert_from_path(pdf_path, dpi)
paths = []
for i, img in enumerate(pages, 1):
if deskew:
img, applied = deskew_image(img)
if applied != 0.0:
# Trace d'audit : on note l'angle corrigé
(out_dir / f"page_{i:02d}.skew").write_text(f"{applied:.3f}\n")
p = out_dir / f"page_{i:02d}.png"
img.save(p, "PNG", optimize=True)
paths.append(p)

136
pipeline/json_utils.py Normal file
View File

@@ -0,0 +1,136 @@
"""Parsing JSON tolérant pour les sorties des VLM.
Les VLM (Qwen, GLM-OCR, GOT-OCR…) produisent du JSON avec des anomalies
fréquentes :
- Encadrement par des fences markdown ```json ... ```
- Virgules manquantes entre objets ou éléments de tableau
- Boucles pathologiques d'objets vides `{"code":"","position":""}` répétés
jusqu'à `max_new_tokens`, ce qui tronque le JSON sans le fermer proprement
Ce module expose `parse_json_output()` qui applique plusieurs stratégies de
récupération avant d'abandonner. En dernier recours, il renvoie un dict avec
`_raw` + `_parse_error` pour audit, jamais `None` (ce qui casserait le pipeline).
"""
from __future__ import annotations
import json
import re
# Pattern qui matche un objet vide générique {"code":"","position":"",...}
_EMPTY_OBJ_PATTERN = re.compile(
r'\{\s*"code"\s*:\s*""\s*,\s*"position"\s*:\s*""\s*(?:,\s*"libelle"\s*:\s*""\s*)?\}',
re.DOTALL,
)
_FENCE_OPEN_RE = re.compile(r"^```(?:json)?\s*")
_FENCE_CLOSE_RE = re.compile(r"\s*```$")
_MISSING_COMMA_OBJ_RE = re.compile(r"\}\s*\n(\s*\{)")
_MISSING_COMMA_ARR_RE = re.compile(r"\]\s*\n(\s*\[)")
def strip_fences(text: str) -> str:
"""Retire un éventuel encadrement ```json ... ```."""
text = _FENCE_OPEN_RE.sub("", text.strip())
text = _FENCE_CLOSE_RE.sub("", text)
return text
def patch_missing_commas(text: str) -> str:
"""Ajoute les virgules manquantes entre `}\\n{` et `]\\n[`.
Les VLM omettent fréquemment ces virgules dans leurs sorties JSON.
"""
text = _MISSING_COMMA_OBJ_RE.sub(r"},\n\1", text)
text = _MISSING_COMMA_ARR_RE.sub(r"],\n\1", text)
return text
def truncate_empty_loop(text: str, max_consecutive: int = 2) -> str:
"""Tronque les boucles d'objets vides (`{"code":"","position":""}` répété).
Cas d'usage : quand un tableau DAS ou Actes est vide dans l'image, le
VLM a parfois tendance à générer le même objet vide en boucle jusqu'à
saturer `max_new_tokens`. La sortie est alors tronquée sans fermer le
JSON → parse error. Garder au maximum `max_consecutive` occurrences.
"""
matches = list(_EMPTY_OBJ_PATTERN.finditer(text))
if len(matches) <= max_consecutive:
return text
cut_at = matches[max_consecutive - 1].end()
return text[:cut_at]
def close_open_json(text: str) -> str:
"""Ajoute les brackets/braces manquants pour fermer un JSON tronqué.
Compte les accolades et crochets non-balancés en ignorant ceux à
l'intérieur d'une chaîne, puis ferme dans le bon ordre (tableaux
ouverts d'abord, puis objets).
"""
depth_brace = 0
depth_bracket = 0
in_string = False
escape = False
for c in text:
if escape:
escape = False
continue
if c == "\\":
escape = True
continue
if c == '"':
in_string = not in_string
continue
if in_string:
continue
if c == "{":
depth_brace += 1
elif c == "}":
depth_brace -= 1
elif c == "[":
depth_bracket += 1
elif c == "]":
depth_bracket -= 1
closed = text.rstrip().rstrip(",")
closed += "]" * max(0, depth_bracket)
closed += "}" * max(0, depth_brace)
return closed
def parse_json_output(raw: str) -> dict | None:
"""Parse une sortie VLM en dict. Applique plusieurs stratégies :
1. strip des fences markdown ```json
2. parse direct
3. patch des virgules manquantes
4. troncature des boucles d'objets vides + fermeture du JSON
En cas d'échec de toutes les stratégies, retourne
`{"_raw": raw, "_parse_error": str}` pour permettre l'audit manuel
plutôt que de casser le pipeline.
"""
if not raw:
return None
text = strip_fences(raw)
try:
return json.loads(text)
except json.JSONDecodeError:
pass
patched = patch_missing_commas(text)
try:
return json.loads(patched)
except json.JSONDecodeError:
pass
trimmed = truncate_empty_loop(patched)
closed = close_open_json(trimmed)
try:
result = json.loads(closed)
if isinstance(result, dict):
result["_truncated_loop"] = True
return result
except json.JSONDecodeError as e:
return {"_raw": raw, "_parse_error": str(e)}

View File

@@ -27,22 +27,64 @@ class QwenVLOCR:
def _init_model(self):
t0 = time.time()
import os as _os
# max_pixels limite le nombre de patches visuels pour éviter l'OOM
# sur images 300 dpi (2481x3509). ~800 patches = équilibre qualité/VRAM,
# tient confortablement dans ~5-6 Go même avec d'autres processus GPU
# en arrière-plan. Configurable via env var QWEN_MAX_PIXELS (en patches).
import os as _os
max_pixels = int(_os.environ.get("QWEN_MAX_PIXELS", 800)) * 28 * 28
self.processor = AutoProcessor.from_pretrained(
MODEL_PATH,
min_pixels=256 * 28 * 28,
max_pixels=max_pixels,
)
self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_PATH,
torch_dtype=torch.bfloat16,
device_map="auto",
)
# Device : "auto" par défaut (GPU si dispo), "cpu" pour forcer le CPU
# quand la VRAM est saturée par d'autres process. Configurable via
# QWEN_DEVICE=cpu.
device = _os.environ.get("QWEN_DEVICE", "auto").lower()
if device == "cpu":
# Sur CPU on cherche à maximiser le throughput :
# 1. Utiliser tous les cores via torch.set_num_threads (set_num_threads
# prime sur OMP_NUM_THREADS pour les ops PyTorch natifs).
# 2. Choisir bfloat16 si le CPU le supporte nativement (Zen 5,
# Zen 4, Intel Sapphire Rapids+ ont AVX-512 BF16). Sinon float32.
n_threads = int(_os.environ.get("TORCH_NUM_THREADS", _os.cpu_count() or 8))
torch.set_num_threads(n_threads)
try:
torch.set_num_interop_threads(n_threads)
except RuntimeError:
pass # déjà initialisé, ignorer
# Détection AVX-512 BF16 via /proc/cpuinfo (Linux)
use_bf16 = False
try:
with open("/proc/cpuinfo") as f:
flags = f.read()
use_bf16 = "avx512_bf16" in flags or "amx_bf16" in flags
except Exception:
pass
dtype = torch.bfloat16 if use_bf16 else torch.float32
self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_PATH,
torch_dtype=dtype,
device_map={"": "cpu"},
low_cpu_mem_usage=True,
)
self.device_used = "cpu"
self.cpu_threads = n_threads
self.cpu_dtype = str(dtype).replace("torch.", "")
else:
self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_PATH,
torch_dtype=torch.bfloat16,
device_map="auto",
)
self.device_used = "cuda" if torch.cuda.is_available() else "cpu"
self.cpu_threads = None
self.cpu_dtype = None
self.model.eval()
self.load_time = time.time() - t0
self.vram_gb = torch.cuda.memory_allocated() / 1e9 if torch.cuda.is_available() else 0.0

203
pipeline/recueil.py Normal file
View File

@@ -0,0 +1,203 @@
"""Logique spécifique à la page `recueil` (fiche médicale de recueil OGC).
Regroupe tout ce qui concerne cette page — la plus riche et la plus difficile
à extraire du dossier OGC — séparé de l'orchestration générale :
- Résolution des zones configurables (crop Recodage, checkboxes)
- Second passage VLM sur le crop de la colonne Recodage
- Fusion du résultat crop dans le JSON principal
- Classification des codes CIM-10 en DP/DR/DAS par règle métier
- Enrichissement post-extraction (checkboxes Accord/Désaccord, ghs_injustifie)
Les fonctions sont testables indépendamment de Qwen quand on leur fournit
déjà les sorties OCR brutes.
"""
from __future__ import annotations
import re
import time
from pathlib import Path
from typing import Any
from PIL import Image
from .checkboxes import (
CheckboxZones,
RECUEIL_ACCORD_DESACCORD,
detect_accord_desaccord,
parse_ghs_injustifie,
)
from .json_utils import parse_json_output
from .ocr_qwen import QwenVLOCR
from .prompts import RECUEIL_RECODAGE_ZONE, SCHEMA_RECUEIL_RECODAGE
from .zones_config import get_zone, load_config
# ============================================================
# Résolution des zones (config JSON + fallback sur les defaults)
# ============================================================
def resolve_recueil_zones() -> tuple[tuple[float, float, float, float], CheckboxZones]:
"""Charge les zones de la page recueil depuis la config utilisateur,
avec fallback sur les constantes compilées si la config est absente.
Retourne (zone_crop_recodage, zones_accord_desaccord).
"""
cfg = load_config()
reco = get_zone("recueil", "codage_reco", cfg) or RECUEIL_RECODAGE_ZONE
acc = get_zone("recueil", "accord_checkbox", cfg)
des = get_zone("recueil", "desaccord_checkbox", cfg)
if acc and des:
cb = CheckboxZones(accord=acc, desaccord=des)
else:
cb = RECUEIL_ACCORD_DESACCORD
return reco, cb
# ============================================================
# Classification CIM-10 → DP / DR / DAS (pur, testable sans VLM)
# ============================================================
CIM10_RE = re.compile(r"^[A-Z]\d{2,4}\s*\*?\s*\+?\d*$")
def filter_cim10_codes(codes_raw: list[Any]) -> list[dict]:
"""Filtre une liste de codes OCR bruts pour ne garder que les CIM-10.
Les VLM peuvent parfois lire des codes CCAM (actes) dans un crop qui
dépasse sur le bloc Actes. On les retire ici pour ne pas polluer les DAS.
"""
kept = []
for c in codes_raw or []:
if not isinstance(c, dict):
continue
code = (c.get("code") or "").strip()
if code and CIM10_RE.match(code):
kept.append({
"code": code,
"position": str(c.get("position") or "").strip(),
})
return kept
def classify_codes_dp_dr_das(codes: list[dict]) -> tuple[str, str, list[dict]]:
"""Classifie une liste de codes {code, position} en DP, DR et liste de DAS.
Règle métier :
- 1er code sans position → DP
- 2e code sans position → DR (ignoré si identique au DP : le VLM peut
dupliquer le DP quand la case DR est visuellement vide)
- tous les codes avec position → DAS
- codes sans position au-delà du 2e → DAS sans position (pour ne rien perdre)
"""
dp, dr = "", ""
das: list[dict] = []
dp_assigned = dr_assigned = False
for c in codes:
code, position = c["code"], c["position"]
if not position:
if not dp_assigned:
dp, dp_assigned = code, True
elif not dr_assigned:
if code == dp:
dr_assigned = True # doublon DP → DR vide
else:
dr, dr_assigned = code, True
else:
das.append({"code": code, "position": ""})
else:
das.append(c)
return dp, dr, das
# ============================================================
# Second passage VLM sur crop Recodage
# ============================================================
def run_recodage_crop_pass(image_path: Path, ocr: QwenVLOCR,
zone: tuple[float, float, float, float] | None = None
) -> dict | None:
"""Execute un second passage VLM sur le crop zonal de la colonne Recodage.
Sauvegarde le crop à côté de l'image source (suffixe `_recodage.png`)
pour audit. Retourne un dict avec `dp/dr/das` + métadonnées, ou None
en cas d'échec d'OCR ou de parsing.
"""
try:
img = Image.open(image_path)
w, h = img.size
z = zone
if z is None:
z, _ = resolve_recueil_zones()
x1, y1, x2, y2 = z
crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
crop_path = image_path.parent / f"{image_path.stem}_recodage.png"
crop.save(crop_path)
except Exception:
return None
t0 = time.time()
res = ocr.run(crop_path, SCHEMA_RECUEIL_RECODAGE, max_new_tokens=1024)
parsed = parse_json_output(res["text"])
if not isinstance(parsed, dict) or "_parse_error" in parsed:
return None
codes = filter_cim10_codes(parsed.get("codes") or [])
dp, dr, das = classify_codes_dp_dr_das(codes)
return {
"dp": dp, "dr": dr, "das": das,
"_source": "crop_recodage",
"_elapsed_s": round(res["elapsed_s"], 2),
"_n_codes_raw": len(parsed.get("codes") or []),
"_n_codes_kept": len(codes),
}
def merge_codage_reco(parsed: dict, reco: dict) -> None:
"""Fusionne le résultat du crop Recodage dans `parsed["codage_reco"]`.
Politique de merge : le crop est plus fiable (contexte isolé) donc il
prime sur le passage principal. Exception : si un champ du crop est vide
mais que le passage principal l'a rempli, on garde celui du passage
principal (on ne dégrade jamais un résultat existant).
"""
existing = parsed.get("codage_reco") if isinstance(parsed.get("codage_reco"), dict) else {}
parsed["codage_reco"] = {
"dp": reco.get("dp", "") or existing.get("dp", ""),
"dr": reco.get("dr", "") or existing.get("dr", ""),
"das": reco.get("das") or existing.get("das") or [],
}
parsed.setdefault("_crop_recodage", {})["result"] = reco
# ============================================================
# Enrichissement post-extraction d'une page recueil
# ============================================================
def enrich_recueil(parsed: dict, image_path: Path, ocr: QwenVLOCR,
cb_zones: CheckboxZones | None = None) -> dict:
"""Enrichit un JSON recueil parsé avec :
- checkbox accord/désaccord (méthode densité pixels, indépendante du VLM)
- normalisation `ghs_injustifie` → 0 / 1 / ""
- second passage VLM sur le crop Recodage si besoin, fusionné dans `codage_reco`
Modifie `parsed` en place et le renvoie (pratique pour chaînage).
"""
if not isinstance(parsed, dict):
return parsed
zones = cb_zones or resolve_recueil_zones()[1]
# Checkboxes accord / désaccord
cb = detect_accord_desaccord(image_path, zones)
parsed["accord_desaccord"] = cb["decision"]
parsed["_checkbox_debug"] = cb
# Normalisation ghs_injustifie
parsed["ghs_injustifie"] = parse_ghs_injustifie(parsed.get("ghs_injustifie", ""))
# Second passage Recodage
reco = run_recodage_crop_pass(image_path, ocr)
if reco:
merge_codage_reco(parsed, reco)
return parsed

View File

@@ -27,7 +27,44 @@ if str(_REPO_ROOT) not in sys.path:
import streamlit as st
from PIL import Image
# ----------------------------------------------------------------------------
# Compatibility shim : streamlit-drawable-canvas 0.9.3 utilise l'API privée
# `streamlit.elements.image.image_to_url` qui a été retirée à partir de
# Streamlit ≈ 1.49. On réinjecte une implémentation équivalente fondée sur
# un data URI base64, ce qui permet au canvas de continuer à fonctionner
# sans downgrader Streamlit globalement.
#
# Remplacer ce shim par l'upgrade de streamlit-drawable-canvas si une version
# > 0.9.3 est publiée.
# ----------------------------------------------------------------------------
import base64 as _b64
import io as _io
from streamlit.elements import image as _st_image # type: ignore
if not hasattr(_st_image, "image_to_url"):
def _image_to_url_compat(image, width, clamp, channels, output_format,
image_id):
"""Convertit une PIL.Image en data URI compatible avec drawable-canvas."""
fmt = (output_format or "PNG").upper()
if fmt == "JPG":
fmt = "JPEG"
buf = _io.BytesIO()
image.save(buf, format=fmt)
b64 = _b64.b64encode(buf.getvalue()).decode("ascii")
mime = "image/jpeg" if fmt == "JPEG" else f"image/{fmt.lower()}"
return f"data:{mime};base64,{b64}"
_st_image.image_to_url = _image_to_url_compat # type: ignore[attr-defined]
# ----------------------------------------------------------------------------
from pipeline.ingest import pdf_to_images
from pipeline.zones_config import load_config, save_config, DEFAULT_CONFIG_PATH
try:
from streamlit_drawable_canvas import st_canvas
_HAS_CANVAS = True
except ImportError:
_HAS_CANVAS = False
# ============================================================
@@ -268,6 +305,130 @@ def render_page_editor(name: str, ptype: str, extract: dict, gold: dict | None):
st.code(page_meta.get("ocr_raw", ""), language="json")
def render_calibration_page():
"""Mode 'Calibration zones' : dessine des rectangles à la souris sur une
image de référence, sauvegarde dans pipeline/zones_config.json."""
st.header("🔧 Calibration des zones")
if not _HAS_CANVAS:
st.error(
"Le package `streamlit-drawable-canvas` n'est pas installé.\n"
"Installe-le avec : `pip install streamlit-drawable-canvas`"
)
return
pdfs = list_pdfs()
if not pdfs:
st.error("Aucun PDF disponible pour la calibration")
return
col_ctrl, _ = st.columns([1, 3])
with col_ctrl:
ref_name = st.selectbox(
"PDF de référence (bien cadré)",
[p.stem for p in pdfs], key="calib_pdf",
)
page_type = st.selectbox(
"Type de page", ["recueil"],
help="Aujourd'hui seule la page recueil a des zones configurables",
)
# Page numéro selon le type (recueil = page 1)
page_num = {"recueil": 1}.get(page_type, 1)
ref_pdf = next(p for p in pdfs if p.stem == ref_name)
img_path = pdf_to_images(str(ref_pdf))[page_num - 1]
img = Image.open(img_path)
img_w, img_h = img.size
# Charger config existante et préparer les zones
cfg = load_config()
existing_zones = cfg.get(page_type, {})
# On scale l'image pour tenir dans le canvas (largeur ~900 px max)
canvas_w = 900
scale = canvas_w / img_w
canvas_h = int(img_h * scale)
# Préparer les rectangles initiaux depuis la config
initial_rects = []
for zone_name, z in existing_zones.items():
if not isinstance(z, dict): continue
initial_rects.append({
"type": "rect",
"left": z["x1"] * canvas_w,
"top": z["y1"] * canvas_h,
"width": (z["x2"] - z["x1"]) * canvas_w,
"height": (z["y2"] - z["y1"]) * canvas_h,
"fill": "rgba(255, 100, 100, 0.15)",
"stroke": "red",
"strokeWidth": 2,
"label_name": zone_name,
})
st.caption(
"💡 Dessine un rectangle par zone à la souris. Les zones existantes "
"apparaissent déjà pré-dessinées. Tu peux les modifier (drag), "
"en ajouter, ou en supprimer (touche Suppr) puis cliquer sur "
"**Sauvegarder**."
)
drawing_mode = st.radio(
"Mode", ["rect", "transform"], horizontal=True,
format_func=lambda x: {"rect": "✏️ Dessiner", "transform": "🖱 Sélectionner / Déplacer"}[x],
key="calib_drawing_mode",
)
canvas_result = st_canvas(
fill_color="rgba(255, 100, 100, 0.15)",
stroke_width=2,
stroke_color="red",
background_image=img,
update_streamlit=True,
width=canvas_w,
height=canvas_h,
drawing_mode=drawing_mode,
initial_drawing={"objects": initial_rects, "version": "5.2.1"},
key="calib_canvas",
)
# Reconstituer la config à partir des rectangles dessinés
rects = (canvas_result.json_data or {}).get("objects", []) if canvas_result.json_data else []
st.markdown("### Zones détectées")
if not rects:
st.info("Aucun rectangle dessiné.")
return
new_zones = {}
for i, r in enumerate(rects):
if r.get("type") != "rect":
continue
# Récupérer le nom existant si présent, sinon demander
default_name = r.get("label_name") or f"zone_{i+1}"
name = st.text_input(
f"Nom de la zone {i+1}",
value=default_name, key=f"calib_name_{i}",
)
x1 = r["left"] / canvas_w
y1 = r["top"] / canvas_h
x2 = x1 + r["width"] / canvas_w
y2 = y1 + r["height"] / canvas_h
desc = existing_zones.get(name, {}).get("description", "")
desc = st.text_input(
f"Description (optionnel)", value=desc, key=f"calib_desc_{i}",
)
st.caption(f"Coords relatives : ({x1:.3f}, {y1:.3f}) → ({x2:.3f}, {y2:.3f})")
new_zones[name] = {"x1": round(x1, 4), "y1": round(y1, 4),
"x2": round(x2, 4), "y2": round(y2, 4),
"description": desc}
if st.button("💾 Sauvegarder la configuration", type="primary"):
cfg[page_type] = new_zones
path = save_config(cfg)
st.success(f"Configuration sauvegardée : {path}")
st.json(new_zones)
def main():
st.set_page_config(page_title="OGC Overlay", layout="wide")
@@ -280,6 +441,13 @@ def main():
st.title("🩺 Extraction OGC — review & gold set")
# Sélecteur de mode en haut de sidebar
with st.sidebar:
mode = st.radio("Mode", ["📋 Review dossier", "🔧 Calibration zones"])
if mode == "🔧 Calibration zones":
render_calibration_page()
return
pdfs = list_pdfs()
if not pdfs:
st.error(f"Aucun PDF trouvé dans {PDF_DIR}")

90
pipeline/zones_config.py Normal file
View File

@@ -0,0 +1,90 @@
"""Configuration des zones d'extraction éditable via l'overlay UI.
Les coordonnées sont relatives (0..1) dans l'image source. Elles sont chargées
au démarrage du pipeline et utilisées à la place des constantes en dur dans
`pipeline/prompts.py` et `pipeline/checkboxes.py` — avec fallback sur ces
constantes si la config n'est pas présente, pour ne pas casser l'existant.
Structure :
{
"recueil": {
"codage_reco": {"x1":0.77, "y1":0.330, "x2":0.97, "y2":0.490, "description":"..."},
"accord_checkbox": {"x1":..., "y1":..., "x2":..., "y2":..., "description":"..."},
"desaccord_checkbox":{...}
},
"concertation_2": {...}
}
Un fichier unique `zones_config.json` à la racine du projet, ou au chemin pointé
par la variable d'env `OGC_ZONES_CONFIG`.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
DEFAULT_CONFIG_PATH = Path(
os.environ.get("OGC_ZONES_CONFIG", "zones_config.json")
)
# Zones par défaut, identiques aux constantes actuelles dans prompts.py et
# checkboxes.py. Sert de fallback et de "mise à jour initiale" quand le
# fichier n'existe pas encore.
DEFAULTS: dict = {
"recueil": {
"codage_reco": {
"x1": 0.77, "y1": 0.330, "x2": 0.97, "y2": 0.490,
"description": "Colonne Recodage (DP / DR / DAS) — exclut le bloc Actes",
},
"accord_checkbox": {
"x1": 0.588, "y1": 0.838, "x2": 0.622, "y2": 0.860,
"description": "Case à cocher 'Accord'",
},
"desaccord_checkbox": {
"x1": 0.588, "y1": 0.858, "x2": 0.622, "y2": 0.880,
"description": "Case à cocher 'Désaccord'",
},
},
}
def load_config(path: Path = DEFAULT_CONFIG_PATH) -> dict:
"""Charge la config JSON, ou retourne les defaults si absente."""
if not path.exists():
return _deep_copy(DEFAULTS)
try:
raw = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return _deep_copy(DEFAULTS)
# Merge : les defaults sont une base, la config utilisateur vient par-dessus
merged = _deep_copy(DEFAULTS)
for page, zones in raw.items():
merged.setdefault(page, {}).update(zones)
return merged
def save_config(cfg: dict, path: Path = DEFAULT_CONFIG_PATH) -> Path:
path.write_text(json.dumps(cfg, ensure_ascii=False, indent=2), encoding="utf-8")
return path
def get_zone(page_type: str, zone_name: str,
config: dict | None = None) -> tuple[float, float, float, float] | None:
"""Récupère une zone depuis la config ou les defaults.
Retourne (x1, y1, x2, y2) ou None si inconnue.
"""
cfg = config or load_config()
z = cfg.get(page_type, {}).get(zone_name)
if not isinstance(z, dict):
return None
try:
return (float(z["x1"]), float(z["y1"]), float(z["x2"]), float(z["y2"]))
except (KeyError, ValueError, TypeError):
return None
def _deep_copy(d: dict) -> dict:
return json.loads(json.dumps(d))

160
tests/test_checkboxes.py Normal file
View File

@@ -0,0 +1,160 @@
"""Tests unitaires pour pipeline.checkboxes."""
from __future__ import annotations
import numpy as np
import pytest
from PIL import Image
from pipeline.checkboxes import (
AMBIGU_MARGIN,
CheckboxZones,
RECUEIL_ACCORD_DESACCORD,
dark_ratio,
detect_accord_desaccord,
parse_ghs_injustifie,
)
# ============================================================
# parse_ghs_injustifie
# ============================================================
class TestParseGhsInjustifie:
@pytest.mark.parametrize("raw,expected", [
("0", "0"),
("1", "1"),
("0 SE 1 2 3 4 ATU FFM FSD", "0"),
("1 SE 2 ATU", "1"),
(" 0 ", "0"),
("", ""),
(None, ""),
("SE 1 2 3 4 ATU FFM FSD", ""), # pas de chiffre de tête
("abc", ""),
("2 SE 1", ""), # 2 n'est ni 0 ni 1
])
def test_cas_varies(self, raw, expected):
assert parse_ghs_injustifie(raw) == expected
# ============================================================
# dark_ratio (avec images synthétiques)
# ============================================================
def _solid_image(w: int, h: int, gray_value: int = 255) -> Image.Image:
arr = np.full((h, w), gray_value, dtype=np.uint8)
return Image.fromarray(arr, mode="L").convert("RGB")
def _image_with_dark_square(w: int, h: int,
square_bbox: tuple[float, float, float, float]) -> Image.Image:
"""Image blanche avec un carré noir dans la zone bbox (coords relatives)."""
arr = np.full((h, w), 255, dtype=np.uint8)
x1, y1, x2, y2 = square_bbox
arr[int(y1*h):int(y2*h), int(x1*w):int(x2*w)] = 0
return Image.fromarray(arr, mode="L").convert("RGB")
class TestDarkRatio:
def test_image_blanche(self):
img = _solid_image(100, 100, 255)
ratio = dark_ratio(img, (0.2, 0.2, 0.8, 0.8))
assert ratio == 0.0
def test_image_noire(self):
img = _solid_image(100, 100, 0)
ratio = dark_ratio(img, (0.2, 0.2, 0.8, 0.8))
assert ratio == 1.0
def test_inner_frac_ignore_les_bords(self):
"""Un carré noir occupe toute la zone mais avec un grand inner_frac
on ne voit que le centre, qui reste dans la zone noire."""
img = _image_with_dark_square(100, 100, (0.0, 0.0, 1.0, 1.0))
# Tout noir, peu importe inner_frac
assert dark_ratio(img, (0.0, 0.0, 1.0, 1.0), inner_frac=0.35) == 1.0
def test_cadre_seul_vs_contenu_central(self):
"""Une case 'vide' (cadre seul) doit avoir un ratio inner_frac faible ;
une case 'cochée' (croix au centre) doit avoir un ratio plus élevé."""
# Simuler un cadre : carré noir sur le pourtour uniquement
w, h = 100, 100
arr = np.full((h, w), 255, dtype=np.uint8)
arr[:5, :] = 0; arr[-5:, :] = 0; arr[:, :5] = 0; arr[:, -5:] = 0
frame_only = Image.fromarray(arr, mode="L").convert("RGB")
# Cadre + croix au centre
arr2 = arr.copy()
# Une croix : 2 diagonales
for i in range(20, 80):
arr2[i, i] = 0
arr2[i, 100 - 1 - i] = 0
checked = Image.fromarray(arr2, mode="L").convert("RGB")
ratio_empty = dark_ratio(frame_only, (0.0, 0.0, 1.0, 1.0), inner_frac=0.35)
ratio_full = dark_ratio(checked, (0.0, 0.0, 1.0, 1.0), inner_frac=0.35)
# La case cochée doit avoir un ratio clairement plus élevé
assert ratio_full > ratio_empty + 0.05
# ============================================================
# detect_accord_desaccord (fixtures cache)
# ============================================================
class TestDetectAccordDesaccord:
"""Tests sur les images réelles du cache, avec ground truth vérifié
visuellement (cf. historique du projet, crops audités un par un).
Ground truth indexé par numéro d'OGC — le mapping vers le hash du cache
est résolu au runtime via pipeline.ingest.pdf_hash pour éviter de coder
les hashes en dur (fragile).
"""
# Ground truth vérifié visuellement sur les 18 dossiers 2018 CARC
GROUND_TRUTH_BY_OGC = {
1: "accord",
7: "accord",
9: "désaccord",
18: "désaccord",
20: "désaccord",
27: "désaccord",
29: "accord",
55: "accord",
66: "désaccord",
68: "accord",
69: "accord",
74: "désaccord",
76: "désaccord",
84: "accord",
86: "désaccord",
97: "accord",
99: "désaccord",
}
@pytest.fixture
def cached_pages_with_truth(self):
"""Résout le mapping numéro OGC → page_01.png disponible au runtime."""
from pathlib import Path
from pipeline.ingest import pdf_hash
pdf_dir = Path("2018 CARC")
if not pdf_dir.is_dir():
pytest.skip("répertoire 2018 CARC/ absent")
found = {}
for n, expected in self.GROUND_TRUTH_BY_OGC.items():
pdf = pdf_dir / f"OGC {n}.pdf"
if not pdf.exists():
continue
h = pdf_hash(str(pdf))
img = Path(f".cache/images/{h}/page_01.png")
if img.exists():
found[f"OGC {n}"] = (str(img), expected)
if not found:
pytest.skip("pas de cache d'images disponible — lance le pipeline d'abord")
return found
def test_ground_truth_echantillon(self, cached_pages_with_truth):
"""Sur les cas vérifiés visuellement, le détecteur doit matcher."""
errors = []
for name, (path, expected) in cached_pages_with_truth.items():
r = detect_accord_desaccord(path)
if r["decision"] != expected:
errors.append(f"{name}: attendu={expected}, got={r}")
assert not errors, "\n".join(errors)

140
tests/test_deskew.py Normal file
View File

@@ -0,0 +1,140 @@
"""Tests unitaires pour pipeline.deskew.
Tests sans dépendance GPU. Génère des images synthétiques en code + utilise
les images du cache pour les cas réels.
"""
from __future__ import annotations
import math
from pathlib import Path
import numpy as np
import pytest
from PIL import Image
from pipeline.deskew import (
MAX_ANGLE_DEG,
MIN_ANGLE_DEG,
NEAR_HORIZONTAL_BAND,
deskew_image,
detect_skew_angle,
)
# ============================================================
# Helpers : fabriquer une image synthétique avec des lignes
# ============================================================
def _make_grid_image(w: int = 800, h: int = 1000,
n_lines: int = 30, angle_deg: float = 0.0) -> Image.Image:
"""Crée une image blanche avec `n_lines` lignes horizontales équi-réparties,
optionnellement tournée d'un angle donné. Parfaite pour tester le détecteur.
"""
arr = np.ones((h, w), dtype=np.uint8) * 255
for i in range(1, n_lines + 1):
y = int(i * h / (n_lines + 1))
arr[y - 1:y + 1, 50:w - 50] = 0 # ligne horizontale noire de 2 px
img = Image.fromarray(arr, mode="L")
if angle_deg != 0.0:
# PIL.rotate : angle positif = sens trigonométrique (= anti-horaire)
# On veut tester avec notre convention (positif = horaire) donc
# on inverse ici pour cohérence avec detect_skew_angle
img = img.rotate(-angle_deg, resample=Image.Resampling.BICUBIC,
expand=False, fillcolor="white")
return img.convert("RGB")
# ============================================================
# Tests de détection
# ============================================================
class TestDetectSkewAngle:
def test_image_parfaitement_droite(self):
img = _make_grid_image()
angle = detect_skew_angle(img)
assert abs(angle) < 0.1, f"image droite doit donner ~0°, got {angle}"
@pytest.mark.parametrize("input_angle", [1.0, 2.0, -3.0, 4.0])
def test_detecte_angles_modérés(self, input_angle):
"""Sur notre image synthétique (30 lignes), la sensibilité est ~1°.
Sur de vraies fiches OGC avec 300+ lignes de tableaux, la sensibilité
descend à 0.3° (cf. test réel sur OGC 1 : +0.91° détecté).
"""
img = _make_grid_image(angle_deg=input_angle)
detected = detect_skew_angle(img)
assert abs(detected - input_angle) < 0.5, \
f"attendu ~{input_angle}°, détecté {detected}°"
def test_image_sans_lignes_retourne_zero(self):
# Image totalement uniforme → aucune ligne détectable
arr = np.ones((500, 500), dtype=np.uint8) * 255
img = Image.fromarray(arr, mode="L").convert("RGB")
assert detect_skew_angle(img) == 0.0
def test_angle_extrême_rejeté(self):
# Une rotation de 45° dépasse MAX_ANGLE_DEG → on refuse de corriger
img = _make_grid_image(angle_deg=45.0)
detected = detect_skew_angle(img)
# Soit 0.0 (pas de lignes quasi-horizontales à ±15°), soit borné
assert abs(detected) < MAX_ANGLE_DEG or detected == 0.0
# ============================================================
# Tests de correction (deskew_image)
# ============================================================
class TestDeskewImage:
def test_image_droite_inchangée(self):
img = _make_grid_image()
rotated, applied = deskew_image(img)
assert applied == 0.0
# Identité bit à bit
assert np.array_equal(np.array(rotated), np.array(img))
def test_image_inclinée_corrigée(self):
img = _make_grid_image(angle_deg=2.0)
rotated, applied = deskew_image(img)
# On attend qu'on applique un angle proche de 2° (convention positive)
assert abs(applied) > MIN_ANGLE_DEG, \
f"devrait corriger, got applied={applied}"
# Après rotation, l'angle résiduel doit être très faible
residual = detect_skew_angle(rotated)
assert abs(residual) < 0.5, \
f"angle résiduel trop grand après correction : {residual}°"
def test_seuil_min_angle_respecté(self):
# Un skew juste sous le seuil ne doit pas être corrigé
img = _make_grid_image(angle_deg=MIN_ANGLE_DEG / 2)
_, applied = deskew_image(img)
assert applied == 0.0
def test_angle_forcé(self):
"""On peut forcer un angle arbitraire indépendamment de la détection."""
img = _make_grid_image() # droit
rotated, applied = deskew_image(img, angle=5.0)
assert applied == 5.0
# Taille conservée
assert rotated.size == img.size
# ============================================================
# Tests avec fixtures réelles (si cache dispo)
# ============================================================
class TestOnRealCachedPages:
"""Ces tests s'exécutent seulement si le cache d'images existe."""
@pytest.fixture
def cached_pages(self):
paths = sorted(Path(".cache/images").glob("*/page_01.png"))
if not paths:
pytest.skip("pas de cache d'images disponible")
return paths
def test_detection_ne_crash_pas(self, cached_pages):
"""Sur toutes les pages cachées, detect_skew_angle ne doit pas planter."""
for p in cached_pages[:5]: # limite pour la vitesse
img = Image.open(p)
angle = detect_skew_angle(img)
assert isinstance(angle, float)
assert abs(angle) <= MAX_ANGLE_DEG

119
tests/test_json_utils.py Normal file
View File

@@ -0,0 +1,119 @@
"""Tests unitaires pour pipeline.json_utils."""
from __future__ import annotations
from pipeline.json_utils import (
close_open_json,
parse_json_output,
patch_missing_commas,
strip_fences,
truncate_empty_loop,
)
class TestStripFences:
def test_fence_json(self):
raw = '```json\n{"a": 1}\n```'
assert strip_fences(raw).strip() == '{"a": 1}'
def test_fence_simple(self):
raw = '```\n{"a": 1}\n```'
assert strip_fences(raw).strip() == '{"a": 1}'
def test_pas_de_fence(self):
raw = '{"a": 1}'
assert strip_fences(raw).strip() == '{"a": 1}'
class TestPatchMissingCommas:
def test_objets_consecutifs(self):
raw = '[\n{"a": 1}\n{"b": 2}\n]'
patched = patch_missing_commas(raw)
assert '},' in patched
def test_deja_correct(self):
raw = '{"a": 1}'
assert patch_missing_commas(raw) == raw
class TestTruncateEmptyLoop:
def test_moins_que_seuil(self):
raw = '[{"code":"","position":""},{"code":"","position":""}]'
# 2 objets vides = seuil par défaut, rien à tronquer
out = truncate_empty_loop(raw, max_consecutive=2)
assert out == raw
def test_boucle_tronquée(self):
objs = ['{"code":"","position":""}'] * 10
raw = '[' + ','.join(objs)
out = truncate_empty_loop(raw, max_consecutive=2)
# Après troncature, ne doit contenir que 2 occurrences
assert out.count('{"code":""') == 2
def test_pas_de_boucle(self):
raw = '[{"code":"K650","position":"1"}]'
assert truncate_empty_loop(raw) == raw
class TestCloseOpenJson:
def test_deja_ferme(self):
raw = '{"a": [1, 2]}'
assert close_open_json(raw) == raw
def test_accolade_manquante(self):
raw = '{"a": 1'
closed = close_open_json(raw)
assert closed == '{"a": 1}'
def test_crochet_manquant(self):
raw = '{"a": [1, 2'
closed = close_open_json(raw)
assert closed == '{"a": [1, 2]}'
def test_accolades_et_crochets_imbriqués(self):
raw = '{"a": {"b": [1, 2'
closed = close_open_json(raw)
assert closed == '{"a": {"b": [1, 2]}}'
def test_virgule_trainante_supprimée(self):
raw = '{"a": 1, '
closed = close_open_json(raw)
assert closed == '{"a": 1}'
def test_accolade_dans_string_ignorée(self):
raw = '{"a": "{ ceci est une { accolade dans une string"'
closed = close_open_json(raw)
# On ajoute juste l'accolade finale manquante
assert closed == raw + '}'
class TestParseJsonOutput:
def test_json_valide(self):
assert parse_json_output('{"a": 1}') == {"a": 1}
def test_vide(self):
assert parse_json_output("") is None
assert parse_json_output(None) is None
def test_fences_markdown(self):
assert parse_json_output('```json\n{"a": 1}\n```') == {"a": 1}
def test_virgule_manquante_recuperee(self):
raw = '[\n{"a": 1}\n{"b": 2}\n]'
result = parse_json_output(raw)
assert result == [{"a": 1}, {"b": 2}]
def test_boucle_tronquée_fermée(self):
objs = ['{"code":"","position":"","libelle":""}'] * 10
raw = '{"das": [\n' + ',\n'.join(objs) # non fermé
result = parse_json_output(raw)
assert isinstance(result, dict)
assert "das" in result
# Après troncature, 2 objets vides max, puis JSON refermé
assert result.get("_truncated_loop") is True
def test_fallback_retourne_raw(self):
"""Quand rien ne marche, on renvoie un dict avec _raw + _parse_error."""
raw = "ceci n'est pas du JSON du tout !"
result = parse_json_output(raw)
assert result.get("_raw") == raw
assert "_parse_error" in result

145
tests/test_recueil.py Normal file
View File

@@ -0,0 +1,145 @@
"""Tests unitaires pour pipeline.recueil (logique métier de la page recueil).
Les fonctions testées ici sont toutes pures (pas d'appel au VLM) :
- filter_cim10_codes
- classify_codes_dp_dr_das
- merge_codage_reco
- resolve_recueil_zones (juste lecture de config)
"""
from __future__ import annotations
from pipeline.recueil import (
classify_codes_dp_dr_das,
filter_cim10_codes,
merge_codage_reco,
resolve_recueil_zones,
)
class TestFilterCim10Codes:
def test_codes_valides_conservés(self):
codes = [
{"code": "K650", "position": "1"},
{"code": "T814", "position": "2"},
{"code": "Z954 *", "position": "3"},
]
out = filter_cim10_codes(codes)
assert len(out) == 3
assert out[0]["code"] == "K650"
def test_ccam_rejeté(self):
"""Un code CCAM (4 lettres + 3 chiffres) ne doit pas passer le filtre CIM-10."""
codes = [
{"code": "K650", "position": ""},
{"code": "EBFA012", "position": "1"}, # CCAM
]
out = filter_cim10_codes(codes)
assert len(out) == 1
assert out[0]["code"] == "K650"
def test_code_vide_rejeté(self):
codes = [{"code": "", "position": ""}, {"code": "K650", "position": ""}]
out = filter_cim10_codes(codes)
assert len(out) == 1
def test_non_dict_ignoré(self):
codes = ["K650", None, {"code": "T814", "position": ""}]
out = filter_cim10_codes(codes)
assert len(out) == 1
def test_liste_vide(self):
assert filter_cim10_codes([]) == []
assert filter_cim10_codes(None) == []
class TestClassifyCodesDpDrDas:
def test_cas_nominal(self):
"""1er sans position = DP, 2e sans position = DR, puis DAS avec positions."""
codes = [
{"code": "K650", "position": ""},
{"code": "T814", "position": ""},
{"code": "Z954", "position": "2"},
{"code": "R33", "position": "3"},
]
dp, dr, das = classify_codes_dp_dr_das(codes)
assert dp == "K650"
assert dr == "T814"
assert [d["code"] for d in das] == ["Z954", "R33"]
def test_dr_vide_non_duplique_dp(self):
"""Quand Qwen duplique le DP (parce que DR est visuellement vide),
on doit considérer que DR est vide, pas DR = DP."""
codes = [
{"code": "K650", "position": ""},
{"code": "K650", "position": ""}, # doublon
{"code": "T814", "position": "2"},
]
dp, dr, das = classify_codes_dp_dr_das(codes)
assert dp == "K650"
assert dr == "" # dédupliqué
assert len(das) == 1
def test_seulement_dp(self):
codes = [{"code": "K650", "position": ""}]
dp, dr, das = classify_codes_dp_dr_das(codes)
assert dp == "K650"
assert dr == ""
assert das == []
def test_tous_avec_positions(self):
"""Si tous les codes ont une position, DP et DR sont vides, tout en DAS."""
codes = [
{"code": "K650", "position": "1"},
{"code": "T814", "position": "2"},
]
dp, dr, das = classify_codes_dp_dr_das(codes)
assert dp == ""
assert dr == ""
assert len(das) == 2
def test_vide(self):
dp, dr, das = classify_codes_dp_dr_das([])
assert (dp, dr, das) == ("", "", [])
class TestMergeCodageReco:
def test_crop_prime_sur_passage_principal(self):
parsed = {"codage_reco": {"dp": "", "dr": "", "das": []}}
reco = {"dp": "K650", "dr": "T814",
"das": [{"code": "Z954", "position": "2"}]}
merge_codage_reco(parsed, reco)
assert parsed["codage_reco"]["dp"] == "K650"
assert parsed["codage_reco"]["dr"] == "T814"
assert len(parsed["codage_reco"]["das"]) == 1
def test_crop_vide_garde_passage_principal(self):
"""Si le crop a un champ vide mais le passage principal l'avait rempli,
on ne dégrade pas : on garde le passage principal."""
parsed = {"codage_reco": {"dp": "K650", "dr": "", "das": []}}
reco = {"dp": "", "dr": "", "das": []}
merge_codage_reco(parsed, reco)
assert parsed["codage_reco"]["dp"] == "K650" # préservé
def test_codage_reco_initialement_absent(self):
parsed = {}
reco = {"dp": "K650", "dr": "", "das": []}
merge_codage_reco(parsed, reco)
assert parsed["codage_reco"]["dp"] == "K650"
def test_trace_crop_ajoutee(self):
parsed = {"codage_reco": {"dp": "", "dr": "", "das": []}}
reco = {"dp": "K650", "_elapsed_s": 1.5}
merge_codage_reco(parsed, reco)
assert parsed["_crop_recodage"]["result"]["_elapsed_s"] == 1.5
class TestResolveRecueilZones:
def test_fallback_constantes(self):
"""Sans config utilisateur, on a les zones par défaut."""
reco, cb = resolve_recueil_zones()
# 4 coords flottantes
assert len(reco) == 4
assert all(isinstance(v, float) for v in reco)
# Checkbox zones
assert len(cb.accord) == 4
assert len(cb.desaccord) == 4

118
tests/test_schema.py Normal file
View File

@@ -0,0 +1,118 @@
"""Tests unitaires pour pipeline.schema (nettoyage JSON)."""
from __future__ import annotations
from pipeline.schema import (
CLEAN_FIELDS_RECUEIL,
DEBUG_FIELDS,
SCHEMA_VERSION,
clean_dossier,
)
def _sample_raw():
"""Un JSON pipeline type, riche en champs debug."""
return {
"fichier": "OGC 7",
"pdf_hash": "abc123",
"pages": [{"page": 1, "type": "recueil"}],
"extraction": {
"recueil": {
"etablissement": "CLINIQUE X",
"finess": "330780206",
"ghm_etab": "11M122",
"ghs_etab": "4323",
"codage_etab": {"dp": "K650"},
"accord_desaccord": "accord",
"_checkbox_debug": {"ratio_accord": 0.38, "ratio_desaccord": 0.19},
"_parse_error": "whatever",
"_truncated_loop": True,
"_crop_recodage": {"dp": "K650", "_source": "crop"},
"_validation": {
"summary": {"valid": 3, "invalid": 0, "empty": 2, "total_codes": 3},
"cross_checks": {
"etab": {"checked": True, "coherent": True},
"reco": {"checked": False, "reason": "ghm manquant"},
},
"codage_etab": {
"dp": {"code": "K650", "valid": True, "libelle_ref": "Péritonite"},
"dr": {"code": "", "valid": None},
"das": [],
},
"codage_reco": {"dp": {}, "dr": {}, "das": []},
"ghm_etab": {"code": "11M122", "valid": True,
"ghs_possibles": ["4323"]},
"ghs_etab": {"code": "4323", "valid": True},
"ghm_reco": {"code": "", "valid": None},
"ghs_reco": {"code": "", "valid": None},
},
},
"concertation_2": {
"ghs_initial": "4323",
"ghs_final": "4323",
"decision": "retour_groupage_dim",
"date_concertation": "13/03/2018",
},
},
"_meta": {"pipeline_version": "v2", "ocr_model": "Qwen/Qwen2.5-VL-3B-Instruct"},
}
class TestCleanDossier:
def test_retourne_schema_version(self):
out = clean_dossier(_sample_raw())
assert out["schema_version"] == SCHEMA_VERSION
def test_retire_tous_les_champs_debug(self):
"""Aucun champ de DEBUG_FIELDS ne doit rester dans la sortie clean."""
out = clean_dossier(_sample_raw())
rec = out["extraction"]["recueil"]
for debug_field in DEBUG_FIELDS:
assert debug_field not in rec, \
f"{debug_field} devrait être retiré"
def test_garde_les_champs_metier(self):
out = clean_dossier(_sample_raw())
rec = out["extraction"]["recueil"]
for f in ["etablissement", "finess", "ghm_etab", "ghs_etab",
"codage_etab", "accord_desaccord"]:
assert f in rec, f"{f} doit être présent dans clean"
def test_validation_compactee(self):
"""La validation est conservée mais en format compact."""
out = clean_dossier(_sample_raw())
v = out["extraction"]["recueil"]["_validation"]
# summary garde tel quel
assert v["summary"]["valid"] == 3
# cross_checks compactés : juste le coherent booléen (ou None)
assert v["cross_checks"] == {
"etab_ghm_ghs_coherent": True,
"reco_ghm_ghs_coherent": None,
}
# Les codes validés gardent libelle_ref quand dispo
assert v["codage_etab"]["dp"]["valid"] is True
assert v["codage_etab"]["dp"].get("libelle_ref") == "Péritonite"
def test_concertation_2_conservee(self):
out = clean_dossier(_sample_raw())
c2 = out["extraction"]["concertation_2"]
assert c2["ghs_initial"] == "4323"
assert c2["decision"] == "retour_groupage_dim"
def test_champs_inconnus_ignorés(self):
"""Un champ qui n'est pas dans CLEAN_FIELDS_RECUEIL est retiré."""
raw = _sample_raw()
raw["extraction"]["recueil"]["champ_inventé"] = "poubelle"
out = clean_dossier(raw)
assert "champ_inventé" not in out["extraction"]["recueil"]
def test_meta_preservee(self):
out = clean_dossier(_sample_raw())
assert out["_meta"]["pipeline_version"] == "v2"
assert "Qwen" in out["_meta"]["ocr_model"]
def test_pas_de_modification_input(self):
"""La fonction ne doit pas modifier l'input."""
raw = _sample_raw()
before = raw["extraction"]["recueil"].copy()
_ = clean_dossier(raw)
assert raw["extraction"]["recueil"] == before

146
tests/test_validation.py Normal file
View File

@@ -0,0 +1,146 @@
"""Tests unitaires pour pipeline.validation."""
from __future__ import annotations
import pytest
from pipeline.validation import (
_check_ccam,
_check_cim10,
_check_ghm,
_check_ghs,
_cross_check_ghm_ghs,
annotate,
validate_recueil,
)
# ============================================================
# Vérifications par type de code
# ============================================================
class TestCheckCim10:
def test_code_valide(self):
r = _check_cim10("K650")
assert r["valid"] is True
assert "libelle_ref" in r
def test_code_vide(self):
assert _check_cim10("")["valid"] is None
assert _check_cim10(None)["valid"] is None
def test_code_avec_suffixe_pmsi(self):
# Les suffixes * et +N sont gérés par la normalisation
r = _check_cim10("C795 *")
assert r["valid"] is True
def test_code_invalide_avec_suggestion(self):
# K65O (O au lieu de 0) n'existe pas, mais K650 oui
r = _check_cim10("K65O")
assert r["valid"] is False
assert r.get("suggestion") == "K650"
def test_code_invalide_sans_suggestion(self):
# Code farfelu sans voisin proche
r = _check_cim10("ZZZZ9999")
assert r["valid"] is False
# suggestion peut être absente
assert r.get("suggestion") is None or r.get("suggestion") != "ZZZZ9999"
class TestCheckGhm:
def test_ghm_valide(self):
r = _check_ghm("11M122")
assert r["valid"] is True
assert isinstance(r.get("ghs_possibles"), list)
assert len(r["ghs_possibles"]) > 0
def test_ghm_invalide(self):
r = _check_ghm("99Z999")
assert r["valid"] is False
class TestCheckGhs:
def test_ghs_valide(self):
assert _check_ghs("4323")["valid"] is True
def test_ghs_invalide(self):
assert _check_ghs("99999")["valid"] is False
class TestCheckCcam:
def test_ccam_valide(self):
assert _check_ccam("EBFA012")["valid"] is True
def test_ccam_invalide(self):
assert _check_ccam("XXXX000")["valid"] is False
# ============================================================
# Cross-checks GHM ↔ GHS
# ============================================================
class TestCrossCheckGhmGhs:
def test_couple_coherent(self):
# 11M122 a bien 4323 dans ses GHS possibles
r = _cross_check_ghm_ghs("11M122", "4323")
assert r["checked"] is True
assert r["coherent"] is True
def test_couple_incoherent(self):
# 11M122 ne correspond pas à n'importe quel GHS
r = _cross_check_ghm_ghs("11M122", "9999")
assert r["checked"] is True
assert r["coherent"] is False
def test_ghm_manquant(self):
r = _cross_check_ghm_ghs("", "4323")
assert r["checked"] is False
def test_ghm_invalide(self):
r = _cross_check_ghm_ghs("99Z999", "4323")
assert r["checked"] is False
assert "invalide" in r["reason"].lower()
# ============================================================
# annotate (intégration)
# ============================================================
class TestAnnotate:
def test_annotate_json_vide(self):
out = annotate({"fichier": "TEST", "extraction": {}})
assert "fichier" in out
assert out["extraction"] == {}
def test_annotate_recueil_complet(self):
raw = {
"fichier": "TEST",
"extraction": {
"recueil": {
"codage_etab": {"dp": "K650", "dr": "", "das": [
{"code": "T814", "position": "2"},
]},
"codage_reco": {"dp": "", "dr": "", "das": []},
"ghm_etab": "11M122",
"ghs_etab": "4323",
"ghm_reco": "",
"ghs_reco": "",
},
},
}
out = annotate(raw)
v = out["extraction"]["recueil"]["_validation"]
assert v["codage_etab"]["dp"]["valid"] is True
assert v["ghm_etab"]["valid"] is True
assert v["cross_checks"]["etab"]["coherent"] is True
assert v["summary"]["valid"] >= 3
def test_annotate_preserve_source(self):
"""L'annotation ne doit pas modifier l'input (copie défensive)."""
raw = {
"fichier": "T",
"extraction": {"recueil": {"codage_etab": {"dp": "K650"}}},
}
out = annotate(raw)
assert "_validation" not in raw["extraction"]["recueil"]
assert "_validation" in out["extraction"]["recueil"]

View File

@@ -0,0 +1,85 @@
"""Tests unitaires pour pipeline.zones_config."""
from __future__ import annotations
import json
import pytest
from pipeline.zones_config import (
DEFAULTS,
get_zone,
load_config,
save_config,
)
class TestLoadConfig:
def test_fichier_absent_retourne_defaults(self, tmp_path):
cfg = load_config(tmp_path / "inexistant.json")
assert cfg == DEFAULTS
def test_charge_depuis_fichier(self, tmp_path):
path = tmp_path / "zones.json"
custom = {
"recueil": {
"codage_reco": {"x1": 0.5, "y1": 0.1, "x2": 0.9, "y2": 0.4,
"description": "test"},
},
}
path.write_text(json.dumps(custom))
cfg = load_config(path)
assert cfg["recueil"]["codage_reco"]["x1"] == 0.5
def test_merge_avec_defaults(self, tmp_path):
"""Les zones non définies dans le fichier tombent en défaut."""
path = tmp_path / "zones.json"
partial = {
"recueil": {"codage_reco": {"x1": 0.1, "y1": 0.2, "x2": 0.3, "y2": 0.4}},
}
path.write_text(json.dumps(partial))
cfg = load_config(path)
# User override appliqué
assert cfg["recueil"]["codage_reco"]["x1"] == 0.1
# Default gardé pour l'autre zone
assert cfg["recueil"]["accord_checkbox"] == DEFAULTS["recueil"]["accord_checkbox"]
def test_json_corrompu_retombe_sur_defaults(self, tmp_path):
path = tmp_path / "corrupt.json"
path.write_text("{ not valid json [")
cfg = load_config(path)
assert cfg == DEFAULTS
class TestSaveConfig:
def test_save_puis_load_round_trip(self, tmp_path):
path = tmp_path / "zones.json"
original = {
"recueil": {
"codage_reco": {"x1": 0.11, "y1": 0.22, "x2": 0.33, "y2": 0.44,
"description": "abc"},
},
}
save_config(original, path)
reloaded = load_config(path)
assert reloaded["recueil"]["codage_reco"]["x1"] == 0.11
assert reloaded["recueil"]["codage_reco"]["description"] == "abc"
class TestGetZone:
def test_zone_existante(self):
z = get_zone("recueil", "codage_reco")
assert isinstance(z, tuple)
assert len(z) == 4
assert all(isinstance(v, float) for v in z)
def test_zone_inconnue_retourne_none(self):
assert get_zone("recueil", "zone_qui_nexiste_pas") is None
assert get_zone("page_fantaisiste", "whatever") is None
def test_config_explicite(self):
cfg = {
"recueil": {
"my_zone": {"x1": 0.0, "y1": 0.0, "x2": 1.0, "y2": 1.0},
},
}
assert get_zone("recueil", "my_zone", config=cfg) == (0.0, 0.0, 1.0, 1.0)