Compare commits

...

2 Commits

Author SHA1 Message Date
Dom
3a87751444 test: couvrir les modules purs du pipeline (96 nouveaux tests)
Suite de tests unitaires pour tous les modules pipeline qui ne dépendent
pas du VLM — utiles pour garantir la non-régression après refactor et
servir de spec vivante de chaque fonction.

Fichiers :
- tests/test_json_utils.py   (20 tests) : parse_json_output + toutes les
  stratégies de récupération (fences, virgules manquantes, boucles vides,
  fermeture JSON, fallback _raw/_parse_error)
- tests/test_deskew.py       (11 tests) : détection Hough + correction,
  image synthétique + fixtures cache réel
- tests/test_checkboxes.py   (17 tests) : parse_ghs_injustifie,
  dark_ratio, inner_frac, et ground truth visuel sur 17 dossiers
  (mapping hash→OGC résolu au runtime pour éviter les constantes fragiles)
- tests/test_validation.py   (18 tests) : _check_cim10/ccam/ghm/ghs,
  cross-checks GHM↔GHS, annotate sur JSON vide et complet,
  preservation de l'input (copie défensive)
- tests/test_schema.py       (8 tests)  : clean_dossier retire les champs
  debug, préserve les champs métier, compacte la validation, ne modifie
  pas l'input
- tests/test_zones_config.py (8 tests)  : load/save round-trip, merge
  avec defaults, résilience JSON corrompu, get_zone

Total : 107 tests, 5.1 s d'exécution, tous passent. Aucune dépendance
GPU, s'exécutent en CI.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 23:29:23 +02:00
Dom
d326524e49 refactor(extract): décomposer en étages testables (json_utils + recueil)
extract.py contenait 4 responsabilités mélangées (320 lignes) :
parsing JSON tolérant, résolution de zones, crop Recodage avec classification
métier, orchestration. Séparation en modules cohérents :

- pipeline/json_utils.py : parsing tolérant réutilisable (strip fences,
  virgules manquantes, troncature des boucles d'objets vides, fermeture
  des structures JSON ouvertes). N'a aucune connaissance métier OGC.

- pipeline/recueil.py    : toute la logique spécifique à la page recueil —
  résolution de zones configurables, filter_cim10_codes, classification
  DP/DR/DAS par règle métier, run_recodage_crop_pass, merge_codage_reco,
  enrich_recueil (orchestration des trois : checkboxes + ghs_injustifie
  + crop Recodage). Chaque fonction est testable indépendamment du VLM.

- pipeline/extract.py    : réduit à l'orchestration pure — ingest, routing,
  boucle page par page, délégation à recueil.enrich_recueil, validation
  ATIH finale. Plus aucune logique métier enfouie.

La fonction extract_dossier garde exactement la même signature et produit
le même JSON en sortie : aucun breaking change externe.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 23:29:03 +02:00
10 changed files with 1328 additions and 260 deletions

View File

@@ -1,230 +1,84 @@
"""Orchestration d'extraction pour un dossier OGC."""
import json
import re
"""Orchestration d'extraction pour un dossier OGC.
Chaîne les étages du pipeline sans connaître leur implémentation interne :
ingest → routing → OCR page par page → enrichissement page-spécifique → validation ATIH
L'orchestration elle-même ne contient aucune logique métier : elle délègue à
`pipeline.recueil`, `pipeline.validation`, `pipeline.classify`, `pipeline.ocr_qwen`
et `pipeline.prompts`. Cela permet de tester indépendamment chaque étage.
"""
from __future__ import annotations
import time
from pathlib import Path
from PIL import Image
from .ingest import pdf_to_images
from .classify import detect_page_type, route_by_index
from .ingest import pdf_to_images
from .json_utils import parse_json_output
from .ocr_qwen import QwenVLOCR
from .prompts import (
PAGE_TYPES, PROMPT_HEADER,
SCHEMA_RECUEIL_RECODAGE, RECUEIL_RECODAGE_ZONE,
)
from .checkboxes import detect_accord_desaccord, RECUEIL_ACCORD_DESACCORD, parse_ghs_injustifie, CheckboxZones
from .zones_config import load_config, get_zone
from .prompts import PAGE_TYPES, PROMPT_HEADER
from .recueil import enrich_recueil, resolve_recueil_zones
from .validation import annotate as validate_annotate
_EMPTY_OBJ_PATTERN = re.compile(
r'\{\s*"code"\s*:\s*""\s*,\s*"position"\s*:\s*""\s*(?:,\s*"libelle"\s*:\s*""\s*)?\}',
re.DOTALL,
)
def _run_page_ocr(ocr: QwenVLOCR, image_path: Path, ptype: str) -> tuple[dict | None, str, float]:
"""Exécute le prompt principal associé à un type de page et parse le JSON.
def _truncate_empty_loop(text: str, max_consecutive: int = 2) -> str:
"""Détecte et tronque les boucles d'objets vides.
GLM-OCR peut boucler sur `{"code":"", "position":"", "libelle":""}` quand
un tableau DAS ou actes est vide dans l'image. La sortie est alors
tronquée à `max_new_tokens` sans fermer le JSON → parse error.
On garde au plus `max_consecutive` objets vides puis on coupe.
Retourne (parsed_dict_ou_None, ocr_raw, elapsed_s). `parsed=None` quand
la page n'a pas de prompt structuré associé (concertation_med, hospit.).
"""
matches = list(_EMPTY_OBJ_PATTERN.finditer(text))
if len(matches) <= max_consecutive:
return text
# On coupe après la fin du `max_consecutive`-ième match
cut_at = matches[max_consecutive - 1].end()
return text[:cut_at]
def _close_open_json(text: str) -> str:
"""Ajoute les brackets/braces manquants pour tenter de fermer un JSON tronqué."""
# Compte les brackets non balancés en ignorant ceux entre guillemets simples/doubles
depth_brace = 0
depth_bracket = 0
in_string = False
escape = False
for c in text:
if escape:
escape = False
continue
if c == "\\":
escape = True
continue
if c == '"':
in_string = not in_string
continue
if in_string:
continue
if c == "{": depth_brace += 1
elif c == "}": depth_brace -= 1
elif c == "[": depth_bracket += 1
elif c == "]": depth_bracket -= 1
# Retirer les virgules traînantes
closed = text.rstrip().rstrip(",")
# Fermer en priorité les crochets ouverts (tableaux), puis les accolades
closed += "]" * max(0, depth_bracket)
closed += "}" * max(0, depth_brace)
return closed
def parse_json_output(raw: str) -> dict | None:
"""Tente d'extraire un JSON depuis la sortie GLM-OCR.
Stratégies successives :
1. parse direct après retrait des fences ```json
2. patch des virgules manquantes entre objets / tableaux
3. détection et troncature des boucles d'objets vides (cas fréquent sur
tableaux DAS/actes vides → boucle jusqu'à max_new_tokens)
4. fermeture des structures JSON ouvertes après troncature
"""
if not raw:
return None
text = raw.strip()
# 1) fences markdown
text = re.sub(r"^```(?:json)?\s*", "", text)
text = re.sub(r"\s*```$", "", text)
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# 2) virgules manquantes entre `} {` et `] [`
patched = re.sub(r"\}\s*\n(\s*\{)", r"},\n\1", text)
patched = re.sub(r"\]\s*\n(\s*\[)", r"],\n\1", patched)
try:
return json.loads(patched)
except json.JSONDecodeError:
pass
# 3) troncature des boucles d'objets vides puis 4) fermeture
trimmed = _truncate_empty_loop(patched)
closed = _close_open_json(trimmed)
try:
result = json.loads(closed)
result["_truncated_loop"] = True # trace de l'intervention
return result
except json.JSONDecodeError as e:
return {"_raw": raw, "_parse_error": str(e)}
def _recueil_zones() -> tuple[tuple, CheckboxZones]:
"""Charge les zones configurables pour la page recueil.
Retourne (recodage_zone, accord_desaccord_zones). Si la config n'a pas
d'entrée, on retombe sur les constantes compilées.
"""
cfg = load_config()
reco = get_zone("recueil", "codage_reco", cfg) or RECUEIL_RECODAGE_ZONE
acc = get_zone("recueil", "accord_checkbox", cfg)
des = get_zone("recueil", "desaccord_checkbox", cfg)
if acc and des:
cb = CheckboxZones(accord=acc, desaccord=des)
else:
cb = RECUEIL_ACCORD_DESACCORD
return reco, cb
def _extract_recodage_crop(image_path: Path, ocr: QwenVLOCR) -> dict | None:
"""Second passage VLM sur le crop zonal de la colonne Recodage.
Qwen nous renvoie la liste brute de tous les codes visibles (avec position
si présente). On classifie DP/DR/DAS en Python par règles :
- les 1ᵉʳ et 2ᵉ codes SANS position → DP puis DR (DR peut être vide si
le 2ᵉ code a déjà une position).
- tous les codes AVEC position → DAS.
Retourne un dict {dp, dr, das[]} ou None en cas d'échec.
"""
try:
img = Image.open(image_path)
w, h = img.size
reco_zone, _ = _recueil_zones()
x1, y1, x2, y2 = reco_zone
crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
crop_path = image_path.parent / f"{image_path.stem}_recodage.png"
crop.save(crop_path)
except Exception:
return None
res = ocr.run(crop_path, SCHEMA_RECUEIL_RECODAGE, max_new_tokens=1024)
conf = PAGE_TYPES.get(ptype)
if not conf or conf["prompt"] == PROMPT_HEADER:
return None, "", 0.0
res = ocr.run(image_path, conf["prompt"], max_new_tokens=4096)
parsed = parse_json_output(res["text"])
if not isinstance(parsed, dict) or "_parse_error" in parsed:
return None
# Filtrer : ne garder que les codes au format CIM-10. Si le crop dépasse
# malgré tout dans la zone Actes, les CCAM (4 lettres + 3 chiffres) seront
# exclus ici.
cim10_re = re.compile(r"^[A-Z]\d{2,4}\s*\*?\s*\+?\d*$")
codes_raw = parsed.get("codes") or []
codes = []
for c in codes_raw:
if not isinstance(c, dict): continue
code = (c.get("code") or "").strip()
if code and cim10_re.match(code):
codes.append({
"code": code,
"position": str(c.get("position") or "").strip(),
})
# Classifier par règle métier :
# - 1er code sans position → DP
# - 2e code sans position → DR (sauf s'il est identique au DP : Qwen tend
# à dupliquer le DP quand DR est vide — on préfère DR="")
# - codes avec position → DAS
dp, dr = "", ""
das = []
dp_assigned = dr_assigned = False
for c in codes:
code, position = c["code"], c["position"]
if not position:
if not dp_assigned:
dp, dp_assigned = code, True
elif not dr_assigned:
if code == dp:
# doublon du DP → on considère que DR est vide
dr_assigned = True
else:
dr, dr_assigned = code, True
else:
das.append({"code": code, "position": ""})
else:
das.append(c)
return {
"dp": dp, "dr": dr, "das": das,
"_source": "crop_recodage",
"_elapsed_s": round(res["elapsed_s"], 2),
"_n_codes_raw": len(codes_raw),
"_n_codes_kept": len(codes),
}
return parsed, res["text"], round(res["elapsed_s"], 2)
def _merge_codage_reco(parsed: dict, reco: dict) -> None:
"""Fusionne le résultat du crop Recodage dans parsed["codage_reco"].
def _resolve_routing(images: list[Path], ocr: QwenVLOCR,
use_standard_routing: bool,
verbose: bool) -> tuple[list[str | None], list[str]]:
"""Détermine le type de chaque page, soit par ordre standard (une seule
vérification sur la page 1), soit par classification OCR page par page.
Politique : le crop est plus fiable (contexte isolé), il prime sur le
passage principal SAUF si le crop laisse vide un champ que le principal
avait bien lu.
Retourne (page_types, headers). `headers[i]` est vide si pas de classify
effectuée sur la page i.
"""
existing = parsed.get("codage_reco") if isinstance(parsed.get("codage_reco"), dict) else {}
merged = {
"dp": reco.get("dp", "") or existing.get("dp", ""),
"dr": reco.get("dr", "") or existing.get("dr", ""),
"das": reco.get("das") or existing.get("das") or [],
}
parsed["codage_reco"] = merged
parsed.setdefault("_crop_recodage", {})["result"] = reco
page_types: list[str | None] = [None] * len(images)
headers: list[str] = [""] * len(images)
if use_standard_routing and images:
ptype1, header1 = detect_page_type(images[0], ocr)
if ptype1 == "recueil":
page_types = list(route_by_index(len(images)))
headers[0] = header1
if verbose:
print(" routing standard (page 1 = recueil OK)")
return page_types, headers
if verbose:
print(f" page 1 = {ptype1} → fallback classification")
# Fallback : classify page par page
for i, img in enumerate(images):
page_types[i], headers[i] = detect_page_type(img, ocr)
return page_types, headers
def extract_dossier(pdf_path: str | Path, verbose: bool = True,
use_standard_routing: bool = True) -> dict:
"""Pipeline complet d'un dossier : PDF → JSON structuré.
"""Pipeline complet d'un dossier : PDF → JSON structuré + annoté ATIH.
use_standard_routing=True (défaut) : route les pages par index selon
l'ordre standard OGC (6 pages), sans OCR de classification. -50% du temps.
Vérifie uniquement la page 1 pour s'assurer qu'on commence bien par
"recueil" — si non, bascule en classification complète (fallback).
Étages :
1. `ingest.pdf_to_images` : PDF → PNG 300 dpi (avec deskew auto, cache)
2. `_resolve_routing` : type de chaque page
3. `_run_page_ocr` : OCR du schéma structuré par type de page
4. `recueil.enrich_recueil` : checkboxes + crop Recodage pour la page recueil
5. `validation.annotate` : validation ATIH de tous les codes extraits
Paramètre `use_standard_routing=True` exploite l'ordre standard des 6 pages
OGC et économise 5 appels OCR par dossier. Bascule automatique sur la
classification page-à-page si la page 1 n'est pas le recueil attendu.
"""
pdf_path = Path(pdf_path)
ocr = QwenVLOCR()
@@ -235,37 +89,22 @@ def extract_dossier(pdf_path: str | Path, verbose: bool = True,
if verbose:
print(f"[{pdf_path.name}] {len(images)} pages converties")
# Choix de stratégie de routing
page_types = [None] * len(images)
headers = [""] * len(images)
if use_standard_routing:
# Vérif rapide sur la page 1 (seul OCR de classification)
ptype1, header1 = detect_page_type(images[0], ocr)
if ptype1 == "recueil":
page_types = route_by_index(len(images))
headers[0] = header1
if verbose:
print(f" routing standard (page 1 = recueil OK)")
else:
if verbose:
print(f" page 1 = {ptype1} → fallback classification")
use_standard_routing = False
page_types, headers = _resolve_routing(images, ocr, use_standard_routing, verbose)
result = {
_, cb_zones = resolve_recueil_zones()
result: dict = {
"fichier": pdf_path.stem,
"pdf_hash": images[0].parent.name,
"pdf_hash": images[0].parent.name if images else "",
"pages": [],
"extraction": {},
}
for idx, img_path in enumerate(images, 1):
t0 = time.time()
if use_standard_routing:
ptype = page_types[idx - 1]
header_text = headers[idx - 1]
else:
ptype, header_text = detect_page_type(img_path, ocr)
page_info = {
page_info: dict = {
"page": idx,
"type": ptype,
"header": header_text.strip(),
@@ -274,43 +113,20 @@ def extract_dossier(pdf_path: str | Path, verbose: bool = True,
if verbose:
print(f" p{idx}: {ptype}")
prompt_conf = PAGE_TYPES.get(ptype)
if prompt_conf and prompt_conf["prompt"] != PROMPT_HEADER:
res = ocr.run(img_path, prompt_conf["prompt"], max_new_tokens=4096)
parsed = parse_json_output(res["text"])
page_info["ocr_raw"] = res["text"]
parsed, ocr_raw, elapsed = _run_page_ocr(ocr, img_path, ptype) if ptype else (None, "", 0.0)
if parsed is not None:
page_info["ocr_raw"] = ocr_raw
page_info["parsed"] = parsed
page_info["elapsed_s"] = round(res["elapsed_s"], 2)
page_info["elapsed_s"] = elapsed
# Enrichissement : checkboxes + normalisation champs booléens
# sur la fiche recueil. GLM-OCR / Qwen ne lisent pas les cases
# à cocher (cf. scratch/test_prompt_crop_v2.py).
if ptype == "recueil" and isinstance(parsed, dict):
_, cb_zones = _recueil_zones()
cb = detect_accord_desaccord(img_path, cb_zones)
parsed["accord_desaccord"] = cb["decision"]
parsed["_checkbox_debug"] = cb # ratios + diff pour audit
# ghs_injustifie : Qwen renvoie parfois "0 SE 1 2 3 4 ATU FFM FSD"
# → ne garder que le chiffre 0/1 de tête
parsed["ghs_injustifie"] = parse_ghs_injustifie(parsed.get("ghs_injustifie", ""))
# Second passage : crop de la colonne Recodage pour compenser
# la sous-extraction observée sur codage_reco.* en passage principal.
reco = _extract_recodage_crop(img_path, ocr)
if reco:
_merge_codage_reco(parsed, reco)
enrich_recueil(parsed, img_path, ocr, cb_zones)
page_info["parsed"] = parsed
# Indexer par type pour accès direct dans result["extraction"]
result["extraction"][ptype] = parsed
else:
# Pages non structurées : juste l'en-tête déjà OCR
page_info["elapsed_s"] = round(time.time() - t0, 2)
result["pages"].append(page_info)
# Post-traitement : validation ATIH de tous les codes extraits
result = validate_annotate(result)
return result
return validate_annotate(result)

136
pipeline/json_utils.py Normal file
View File

@@ -0,0 +1,136 @@
"""Parsing JSON tolérant pour les sorties des VLM.
Les VLM (Qwen, GLM-OCR, GOT-OCR…) produisent du JSON avec des anomalies
fréquentes :
- Encadrement par des fences markdown ```json ... ```
- Virgules manquantes entre objets ou éléments de tableau
- Boucles pathologiques d'objets vides `{"code":"","position":""}` répétés
jusqu'à `max_new_tokens`, ce qui tronque le JSON sans le fermer proprement
Ce module expose `parse_json_output()` qui applique plusieurs stratégies de
récupération avant d'abandonner. En dernier recours, il renvoie un dict avec
`_raw` + `_parse_error` pour audit, jamais `None` (ce qui casserait le pipeline).
"""
from __future__ import annotations
import json
import re
# Pattern qui matche un objet vide générique {"code":"","position":"",...}
_EMPTY_OBJ_PATTERN = re.compile(
r'\{\s*"code"\s*:\s*""\s*,\s*"position"\s*:\s*""\s*(?:,\s*"libelle"\s*:\s*""\s*)?\}',
re.DOTALL,
)
_FENCE_OPEN_RE = re.compile(r"^```(?:json)?\s*")
_FENCE_CLOSE_RE = re.compile(r"\s*```$")
_MISSING_COMMA_OBJ_RE = re.compile(r"\}\s*\n(\s*\{)")
_MISSING_COMMA_ARR_RE = re.compile(r"\]\s*\n(\s*\[)")
def strip_fences(text: str) -> str:
"""Retire un éventuel encadrement ```json ... ```."""
text = _FENCE_OPEN_RE.sub("", text.strip())
text = _FENCE_CLOSE_RE.sub("", text)
return text
def patch_missing_commas(text: str) -> str:
"""Ajoute les virgules manquantes entre `}\\n{` et `]\\n[`.
Les VLM omettent fréquemment ces virgules dans leurs sorties JSON.
"""
text = _MISSING_COMMA_OBJ_RE.sub(r"},\n\1", text)
text = _MISSING_COMMA_ARR_RE.sub(r"],\n\1", text)
return text
def truncate_empty_loop(text: str, max_consecutive: int = 2) -> str:
"""Tronque les boucles d'objets vides (`{"code":"","position":""}` répété).
Cas d'usage : quand un tableau DAS ou Actes est vide dans l'image, le
VLM a parfois tendance à générer le même objet vide en boucle jusqu'à
saturer `max_new_tokens`. La sortie est alors tronquée sans fermer le
JSON → parse error. Garder au maximum `max_consecutive` occurrences.
"""
matches = list(_EMPTY_OBJ_PATTERN.finditer(text))
if len(matches) <= max_consecutive:
return text
cut_at = matches[max_consecutive - 1].end()
return text[:cut_at]
def close_open_json(text: str) -> str:
"""Ajoute les brackets/braces manquants pour fermer un JSON tronqué.
Compte les accolades et crochets non-balancés en ignorant ceux à
l'intérieur d'une chaîne, puis ferme dans le bon ordre (tableaux
ouverts d'abord, puis objets).
"""
depth_brace = 0
depth_bracket = 0
in_string = False
escape = False
for c in text:
if escape:
escape = False
continue
if c == "\\":
escape = True
continue
if c == '"':
in_string = not in_string
continue
if in_string:
continue
if c == "{":
depth_brace += 1
elif c == "}":
depth_brace -= 1
elif c == "[":
depth_bracket += 1
elif c == "]":
depth_bracket -= 1
closed = text.rstrip().rstrip(",")
closed += "]" * max(0, depth_bracket)
closed += "}" * max(0, depth_brace)
return closed
def parse_json_output(raw: str) -> dict | None:
"""Parse une sortie VLM en dict. Applique plusieurs stratégies :
1. strip des fences markdown ```json
2. parse direct
3. patch des virgules manquantes
4. troncature des boucles d'objets vides + fermeture du JSON
En cas d'échec de toutes les stratégies, retourne
`{"_raw": raw, "_parse_error": str}` pour permettre l'audit manuel
plutôt que de casser le pipeline.
"""
if not raw:
return None
text = strip_fences(raw)
try:
return json.loads(text)
except json.JSONDecodeError:
pass
patched = patch_missing_commas(text)
try:
return json.loads(patched)
except json.JSONDecodeError:
pass
trimmed = truncate_empty_loop(patched)
closed = close_open_json(trimmed)
try:
result = json.loads(closed)
if isinstance(result, dict):
result["_truncated_loop"] = True
return result
except json.JSONDecodeError as e:
return {"_raw": raw, "_parse_error": str(e)}

203
pipeline/recueil.py Normal file
View File

@@ -0,0 +1,203 @@
"""Logique spécifique à la page `recueil` (fiche médicale de recueil OGC).
Regroupe tout ce qui concerne cette page — la plus riche et la plus difficile
à extraire du dossier OGC — séparé de l'orchestration générale :
- Résolution des zones configurables (crop Recodage, checkboxes)
- Second passage VLM sur le crop de la colonne Recodage
- Fusion du résultat crop dans le JSON principal
- Classification des codes CIM-10 en DP/DR/DAS par règle métier
- Enrichissement post-extraction (checkboxes Accord/Désaccord, ghs_injustifie)
Les fonctions sont testables indépendamment de Qwen quand on leur fournit
déjà les sorties OCR brutes.
"""
from __future__ import annotations
import re
import time
from pathlib import Path
from typing import Any
from PIL import Image
from .checkboxes import (
CheckboxZones,
RECUEIL_ACCORD_DESACCORD,
detect_accord_desaccord,
parse_ghs_injustifie,
)
from .json_utils import parse_json_output
from .ocr_qwen import QwenVLOCR
from .prompts import RECUEIL_RECODAGE_ZONE, SCHEMA_RECUEIL_RECODAGE
from .zones_config import get_zone, load_config
# ============================================================
# Résolution des zones (config JSON + fallback sur les defaults)
# ============================================================
def resolve_recueil_zones() -> tuple[tuple[float, float, float, float], CheckboxZones]:
"""Charge les zones de la page recueil depuis la config utilisateur,
avec fallback sur les constantes compilées si la config est absente.
Retourne (zone_crop_recodage, zones_accord_desaccord).
"""
cfg = load_config()
reco = get_zone("recueil", "codage_reco", cfg) or RECUEIL_RECODAGE_ZONE
acc = get_zone("recueil", "accord_checkbox", cfg)
des = get_zone("recueil", "desaccord_checkbox", cfg)
if acc and des:
cb = CheckboxZones(accord=acc, desaccord=des)
else:
cb = RECUEIL_ACCORD_DESACCORD
return reco, cb
# ============================================================
# Classification CIM-10 → DP / DR / DAS (pur, testable sans VLM)
# ============================================================
CIM10_RE = re.compile(r"^[A-Z]\d{2,4}\s*\*?\s*\+?\d*$")
def filter_cim10_codes(codes_raw: list[Any]) -> list[dict]:
"""Filtre une liste de codes OCR bruts pour ne garder que les CIM-10.
Les VLM peuvent parfois lire des codes CCAM (actes) dans un crop qui
dépasse sur le bloc Actes. On les retire ici pour ne pas polluer les DAS.
"""
kept = []
for c in codes_raw or []:
if not isinstance(c, dict):
continue
code = (c.get("code") or "").strip()
if code and CIM10_RE.match(code):
kept.append({
"code": code,
"position": str(c.get("position") or "").strip(),
})
return kept
def classify_codes_dp_dr_das(codes: list[dict]) -> tuple[str, str, list[dict]]:
"""Classifie une liste de codes {code, position} en DP, DR et liste de DAS.
Règle métier :
- 1er code sans position → DP
- 2e code sans position → DR (ignoré si identique au DP : le VLM peut
dupliquer le DP quand la case DR est visuellement vide)
- tous les codes avec position → DAS
- codes sans position au-delà du 2e → DAS sans position (pour ne rien perdre)
"""
dp, dr = "", ""
das: list[dict] = []
dp_assigned = dr_assigned = False
for c in codes:
code, position = c["code"], c["position"]
if not position:
if not dp_assigned:
dp, dp_assigned = code, True
elif not dr_assigned:
if code == dp:
dr_assigned = True # doublon DP → DR vide
else:
dr, dr_assigned = code, True
else:
das.append({"code": code, "position": ""})
else:
das.append(c)
return dp, dr, das
# ============================================================
# Second passage VLM sur crop Recodage
# ============================================================
def run_recodage_crop_pass(image_path: Path, ocr: QwenVLOCR,
zone: tuple[float, float, float, float] | None = None
) -> dict | None:
"""Execute un second passage VLM sur le crop zonal de la colonne Recodage.
Sauvegarde le crop à côté de l'image source (suffixe `_recodage.png`)
pour audit. Retourne un dict avec `dp/dr/das` + métadonnées, ou None
en cas d'échec d'OCR ou de parsing.
"""
try:
img = Image.open(image_path)
w, h = img.size
z = zone
if z is None:
z, _ = resolve_recueil_zones()
x1, y1, x2, y2 = z
crop = img.crop((int(x1 * w), int(y1 * h), int(x2 * w), int(y2 * h)))
crop_path = image_path.parent / f"{image_path.stem}_recodage.png"
crop.save(crop_path)
except Exception:
return None
t0 = time.time()
res = ocr.run(crop_path, SCHEMA_RECUEIL_RECODAGE, max_new_tokens=1024)
parsed = parse_json_output(res["text"])
if not isinstance(parsed, dict) or "_parse_error" in parsed:
return None
codes = filter_cim10_codes(parsed.get("codes") or [])
dp, dr, das = classify_codes_dp_dr_das(codes)
return {
"dp": dp, "dr": dr, "das": das,
"_source": "crop_recodage",
"_elapsed_s": round(res["elapsed_s"], 2),
"_n_codes_raw": len(parsed.get("codes") or []),
"_n_codes_kept": len(codes),
}
def merge_codage_reco(parsed: dict, reco: dict) -> None:
"""Fusionne le résultat du crop Recodage dans `parsed["codage_reco"]`.
Politique de merge : le crop est plus fiable (contexte isolé) donc il
prime sur le passage principal. Exception : si un champ du crop est vide
mais que le passage principal l'a rempli, on garde celui du passage
principal (on ne dégrade jamais un résultat existant).
"""
existing = parsed.get("codage_reco") if isinstance(parsed.get("codage_reco"), dict) else {}
parsed["codage_reco"] = {
"dp": reco.get("dp", "") or existing.get("dp", ""),
"dr": reco.get("dr", "") or existing.get("dr", ""),
"das": reco.get("das") or existing.get("das") or [],
}
parsed.setdefault("_crop_recodage", {})["result"] = reco
# ============================================================
# Enrichissement post-extraction d'une page recueil
# ============================================================
def enrich_recueil(parsed: dict, image_path: Path, ocr: QwenVLOCR,
cb_zones: CheckboxZones | None = None) -> dict:
"""Enrichit un JSON recueil parsé avec :
- checkbox accord/désaccord (méthode densité pixels, indépendante du VLM)
- normalisation `ghs_injustifie` → 0 / 1 / ""
- second passage VLM sur le crop Recodage si besoin, fusionné dans `codage_reco`
Modifie `parsed` en place et le renvoie (pratique pour chaînage).
"""
if not isinstance(parsed, dict):
return parsed
zones = cb_zones or resolve_recueil_zones()[1]
# Checkboxes accord / désaccord
cb = detect_accord_desaccord(image_path, zones)
parsed["accord_desaccord"] = cb["decision"]
parsed["_checkbox_debug"] = cb
# Normalisation ghs_injustifie
parsed["ghs_injustifie"] = parse_ghs_injustifie(parsed.get("ghs_injustifie", ""))
# Second passage Recodage
reco = run_recodage_crop_pass(image_path, ocr)
if reco:
merge_codage_reco(parsed, reco)
return parsed

160
tests/test_checkboxes.py Normal file
View File

@@ -0,0 +1,160 @@
"""Tests unitaires pour pipeline.checkboxes."""
from __future__ import annotations
import numpy as np
import pytest
from PIL import Image
from pipeline.checkboxes import (
AMBIGU_MARGIN,
CheckboxZones,
RECUEIL_ACCORD_DESACCORD,
dark_ratio,
detect_accord_desaccord,
parse_ghs_injustifie,
)
# ============================================================
# parse_ghs_injustifie
# ============================================================
class TestParseGhsInjustifie:
@pytest.mark.parametrize("raw,expected", [
("0", "0"),
("1", "1"),
("0 SE 1 2 3 4 ATU FFM FSD", "0"),
("1 SE 2 ATU", "1"),
(" 0 ", "0"),
("", ""),
(None, ""),
("SE 1 2 3 4 ATU FFM FSD", ""), # pas de chiffre de tête
("abc", ""),
("2 SE 1", ""), # 2 n'est ni 0 ni 1
])
def test_cas_varies(self, raw, expected):
assert parse_ghs_injustifie(raw) == expected
# ============================================================
# dark_ratio (avec images synthétiques)
# ============================================================
def _solid_image(w: int, h: int, gray_value: int = 255) -> Image.Image:
arr = np.full((h, w), gray_value, dtype=np.uint8)
return Image.fromarray(arr, mode="L").convert("RGB")
def _image_with_dark_square(w: int, h: int,
square_bbox: tuple[float, float, float, float]) -> Image.Image:
"""Image blanche avec un carré noir dans la zone bbox (coords relatives)."""
arr = np.full((h, w), 255, dtype=np.uint8)
x1, y1, x2, y2 = square_bbox
arr[int(y1*h):int(y2*h), int(x1*w):int(x2*w)] = 0
return Image.fromarray(arr, mode="L").convert("RGB")
class TestDarkRatio:
def test_image_blanche(self):
img = _solid_image(100, 100, 255)
ratio = dark_ratio(img, (0.2, 0.2, 0.8, 0.8))
assert ratio == 0.0
def test_image_noire(self):
img = _solid_image(100, 100, 0)
ratio = dark_ratio(img, (0.2, 0.2, 0.8, 0.8))
assert ratio == 1.0
def test_inner_frac_ignore_les_bords(self):
"""Un carré noir occupe toute la zone mais avec un grand inner_frac
on ne voit que le centre, qui reste dans la zone noire."""
img = _image_with_dark_square(100, 100, (0.0, 0.0, 1.0, 1.0))
# Tout noir, peu importe inner_frac
assert dark_ratio(img, (0.0, 0.0, 1.0, 1.0), inner_frac=0.35) == 1.0
def test_cadre_seul_vs_contenu_central(self):
"""Une case 'vide' (cadre seul) doit avoir un ratio inner_frac faible ;
une case 'cochée' (croix au centre) doit avoir un ratio plus élevé."""
# Simuler un cadre : carré noir sur le pourtour uniquement
w, h = 100, 100
arr = np.full((h, w), 255, dtype=np.uint8)
arr[:5, :] = 0; arr[-5:, :] = 0; arr[:, :5] = 0; arr[:, -5:] = 0
frame_only = Image.fromarray(arr, mode="L").convert("RGB")
# Cadre + croix au centre
arr2 = arr.copy()
# Une croix : 2 diagonales
for i in range(20, 80):
arr2[i, i] = 0
arr2[i, 100 - 1 - i] = 0
checked = Image.fromarray(arr2, mode="L").convert("RGB")
ratio_empty = dark_ratio(frame_only, (0.0, 0.0, 1.0, 1.0), inner_frac=0.35)
ratio_full = dark_ratio(checked, (0.0, 0.0, 1.0, 1.0), inner_frac=0.35)
# La case cochée doit avoir un ratio clairement plus élevé
assert ratio_full > ratio_empty + 0.05
# ============================================================
# detect_accord_desaccord (fixtures cache)
# ============================================================
class TestDetectAccordDesaccord:
"""Tests sur les images réelles du cache, avec ground truth vérifié
visuellement (cf. historique du projet, crops audités un par un).
Ground truth indexé par numéro d'OGC — le mapping vers le hash du cache
est résolu au runtime via pipeline.ingest.pdf_hash pour éviter de coder
les hashes en dur (fragile).
"""
# Ground truth vérifié visuellement sur les 18 dossiers 2018 CARC
GROUND_TRUTH_BY_OGC = {
1: "accord",
7: "accord",
9: "désaccord",
18: "désaccord",
20: "désaccord",
27: "désaccord",
29: "accord",
55: "accord",
66: "désaccord",
68: "accord",
69: "accord",
74: "désaccord",
76: "désaccord",
84: "accord",
86: "désaccord",
97: "accord",
99: "désaccord",
}
@pytest.fixture
def cached_pages_with_truth(self):
"""Résout le mapping numéro OGC → page_01.png disponible au runtime."""
from pathlib import Path
from pipeline.ingest import pdf_hash
pdf_dir = Path("2018 CARC")
if not pdf_dir.is_dir():
pytest.skip("répertoire 2018 CARC/ absent")
found = {}
for n, expected in self.GROUND_TRUTH_BY_OGC.items():
pdf = pdf_dir / f"OGC {n}.pdf"
if not pdf.exists():
continue
h = pdf_hash(str(pdf))
img = Path(f".cache/images/{h}/page_01.png")
if img.exists():
found[f"OGC {n}"] = (str(img), expected)
if not found:
pytest.skip("pas de cache d'images disponible — lance le pipeline d'abord")
return found
def test_ground_truth_echantillon(self, cached_pages_with_truth):
"""Sur les cas vérifiés visuellement, le détecteur doit matcher."""
errors = []
for name, (path, expected) in cached_pages_with_truth.items():
r = detect_accord_desaccord(path)
if r["decision"] != expected:
errors.append(f"{name}: attendu={expected}, got={r}")
assert not errors, "\n".join(errors)

140
tests/test_deskew.py Normal file
View File

@@ -0,0 +1,140 @@
"""Tests unitaires pour pipeline.deskew.
Tests sans dépendance GPU. Génère des images synthétiques en code + utilise
les images du cache pour les cas réels.
"""
from __future__ import annotations
import math
from pathlib import Path
import numpy as np
import pytest
from PIL import Image
from pipeline.deskew import (
MAX_ANGLE_DEG,
MIN_ANGLE_DEG,
NEAR_HORIZONTAL_BAND,
deskew_image,
detect_skew_angle,
)
# ============================================================
# Helpers : fabriquer une image synthétique avec des lignes
# ============================================================
def _make_grid_image(w: int = 800, h: int = 1000,
n_lines: int = 30, angle_deg: float = 0.0) -> Image.Image:
"""Crée une image blanche avec `n_lines` lignes horizontales équi-réparties,
optionnellement tournée d'un angle donné. Parfaite pour tester le détecteur.
"""
arr = np.ones((h, w), dtype=np.uint8) * 255
for i in range(1, n_lines + 1):
y = int(i * h / (n_lines + 1))
arr[y - 1:y + 1, 50:w - 50] = 0 # ligne horizontale noire de 2 px
img = Image.fromarray(arr, mode="L")
if angle_deg != 0.0:
# PIL.rotate : angle positif = sens trigonométrique (= anti-horaire)
# On veut tester avec notre convention (positif = horaire) donc
# on inverse ici pour cohérence avec detect_skew_angle
img = img.rotate(-angle_deg, resample=Image.Resampling.BICUBIC,
expand=False, fillcolor="white")
return img.convert("RGB")
# ============================================================
# Tests de détection
# ============================================================
class TestDetectSkewAngle:
def test_image_parfaitement_droite(self):
img = _make_grid_image()
angle = detect_skew_angle(img)
assert abs(angle) < 0.1, f"image droite doit donner ~0°, got {angle}"
@pytest.mark.parametrize("input_angle", [1.0, 2.0, -3.0, 4.0])
def test_detecte_angles_modérés(self, input_angle):
"""Sur notre image synthétique (30 lignes), la sensibilité est ~1°.
Sur de vraies fiches OGC avec 300+ lignes de tableaux, la sensibilité
descend à 0.3° (cf. test réel sur OGC 1 : +0.91° détecté).
"""
img = _make_grid_image(angle_deg=input_angle)
detected = detect_skew_angle(img)
assert abs(detected - input_angle) < 0.5, \
f"attendu ~{input_angle}°, détecté {detected}°"
def test_image_sans_lignes_retourne_zero(self):
# Image totalement uniforme → aucune ligne détectable
arr = np.ones((500, 500), dtype=np.uint8) * 255
img = Image.fromarray(arr, mode="L").convert("RGB")
assert detect_skew_angle(img) == 0.0
def test_angle_extrême_rejeté(self):
# Une rotation de 45° dépasse MAX_ANGLE_DEG → on refuse de corriger
img = _make_grid_image(angle_deg=45.0)
detected = detect_skew_angle(img)
# Soit 0.0 (pas de lignes quasi-horizontales à ±15°), soit borné
assert abs(detected) < MAX_ANGLE_DEG or detected == 0.0
# ============================================================
# Tests de correction (deskew_image)
# ============================================================
class TestDeskewImage:
def test_image_droite_inchangée(self):
img = _make_grid_image()
rotated, applied = deskew_image(img)
assert applied == 0.0
# Identité bit à bit
assert np.array_equal(np.array(rotated), np.array(img))
def test_image_inclinée_corrigée(self):
img = _make_grid_image(angle_deg=2.0)
rotated, applied = deskew_image(img)
# On attend qu'on applique un angle proche de 2° (convention positive)
assert abs(applied) > MIN_ANGLE_DEG, \
f"devrait corriger, got applied={applied}"
# Après rotation, l'angle résiduel doit être très faible
residual = detect_skew_angle(rotated)
assert abs(residual) < 0.5, \
f"angle résiduel trop grand après correction : {residual}°"
def test_seuil_min_angle_respecté(self):
# Un skew juste sous le seuil ne doit pas être corrigé
img = _make_grid_image(angle_deg=MIN_ANGLE_DEG / 2)
_, applied = deskew_image(img)
assert applied == 0.0
def test_angle_forcé(self):
"""On peut forcer un angle arbitraire indépendamment de la détection."""
img = _make_grid_image() # droit
rotated, applied = deskew_image(img, angle=5.0)
assert applied == 5.0
# Taille conservée
assert rotated.size == img.size
# ============================================================
# Tests avec fixtures réelles (si cache dispo)
# ============================================================
class TestOnRealCachedPages:
"""Ces tests s'exécutent seulement si le cache d'images existe."""
@pytest.fixture
def cached_pages(self):
paths = sorted(Path(".cache/images").glob("*/page_01.png"))
if not paths:
pytest.skip("pas de cache d'images disponible")
return paths
def test_detection_ne_crash_pas(self, cached_pages):
"""Sur toutes les pages cachées, detect_skew_angle ne doit pas planter."""
for p in cached_pages[:5]: # limite pour la vitesse
img = Image.open(p)
angle = detect_skew_angle(img)
assert isinstance(angle, float)
assert abs(angle) <= MAX_ANGLE_DEG

119
tests/test_json_utils.py Normal file
View File

@@ -0,0 +1,119 @@
"""Tests unitaires pour pipeline.json_utils."""
from __future__ import annotations
from pipeline.json_utils import (
close_open_json,
parse_json_output,
patch_missing_commas,
strip_fences,
truncate_empty_loop,
)
class TestStripFences:
def test_fence_json(self):
raw = '```json\n{"a": 1}\n```'
assert strip_fences(raw).strip() == '{"a": 1}'
def test_fence_simple(self):
raw = '```\n{"a": 1}\n```'
assert strip_fences(raw).strip() == '{"a": 1}'
def test_pas_de_fence(self):
raw = '{"a": 1}'
assert strip_fences(raw).strip() == '{"a": 1}'
class TestPatchMissingCommas:
def test_objets_consecutifs(self):
raw = '[\n{"a": 1}\n{"b": 2}\n]'
patched = patch_missing_commas(raw)
assert '},' in patched
def test_deja_correct(self):
raw = '{"a": 1}'
assert patch_missing_commas(raw) == raw
class TestTruncateEmptyLoop:
def test_moins_que_seuil(self):
raw = '[{"code":"","position":""},{"code":"","position":""}]'
# 2 objets vides = seuil par défaut, rien à tronquer
out = truncate_empty_loop(raw, max_consecutive=2)
assert out == raw
def test_boucle_tronquée(self):
objs = ['{"code":"","position":""}'] * 10
raw = '[' + ','.join(objs)
out = truncate_empty_loop(raw, max_consecutive=2)
# Après troncature, ne doit contenir que 2 occurrences
assert out.count('{"code":""') == 2
def test_pas_de_boucle(self):
raw = '[{"code":"K650","position":"1"}]'
assert truncate_empty_loop(raw) == raw
class TestCloseOpenJson:
def test_deja_ferme(self):
raw = '{"a": [1, 2]}'
assert close_open_json(raw) == raw
def test_accolade_manquante(self):
raw = '{"a": 1'
closed = close_open_json(raw)
assert closed == '{"a": 1}'
def test_crochet_manquant(self):
raw = '{"a": [1, 2'
closed = close_open_json(raw)
assert closed == '{"a": [1, 2]}'
def test_accolades_et_crochets_imbriqués(self):
raw = '{"a": {"b": [1, 2'
closed = close_open_json(raw)
assert closed == '{"a": {"b": [1, 2]}}'
def test_virgule_trainante_supprimée(self):
raw = '{"a": 1, '
closed = close_open_json(raw)
assert closed == '{"a": 1}'
def test_accolade_dans_string_ignorée(self):
raw = '{"a": "{ ceci est une { accolade dans une string"'
closed = close_open_json(raw)
# On ajoute juste l'accolade finale manquante
assert closed == raw + '}'
class TestParseJsonOutput:
def test_json_valide(self):
assert parse_json_output('{"a": 1}') == {"a": 1}
def test_vide(self):
assert parse_json_output("") is None
assert parse_json_output(None) is None
def test_fences_markdown(self):
assert parse_json_output('```json\n{"a": 1}\n```') == {"a": 1}
def test_virgule_manquante_recuperee(self):
raw = '[\n{"a": 1}\n{"b": 2}\n]'
result = parse_json_output(raw)
assert result == [{"a": 1}, {"b": 2}]
def test_boucle_tronquée_fermée(self):
objs = ['{"code":"","position":"","libelle":""}'] * 10
raw = '{"das": [\n' + ',\n'.join(objs) # non fermé
result = parse_json_output(raw)
assert isinstance(result, dict)
assert "das" in result
# Après troncature, 2 objets vides max, puis JSON refermé
assert result.get("_truncated_loop") is True
def test_fallback_retourne_raw(self):
"""Quand rien ne marche, on renvoie un dict avec _raw + _parse_error."""
raw = "ceci n'est pas du JSON du tout !"
result = parse_json_output(raw)
assert result.get("_raw") == raw
assert "_parse_error" in result

145
tests/test_recueil.py Normal file
View File

@@ -0,0 +1,145 @@
"""Tests unitaires pour pipeline.recueil (logique métier de la page recueil).
Les fonctions testées ici sont toutes pures (pas d'appel au VLM) :
- filter_cim10_codes
- classify_codes_dp_dr_das
- merge_codage_reco
- resolve_recueil_zones (juste lecture de config)
"""
from __future__ import annotations
from pipeline.recueil import (
classify_codes_dp_dr_das,
filter_cim10_codes,
merge_codage_reco,
resolve_recueil_zones,
)
class TestFilterCim10Codes:
def test_codes_valides_conservés(self):
codes = [
{"code": "K650", "position": "1"},
{"code": "T814", "position": "2"},
{"code": "Z954 *", "position": "3"},
]
out = filter_cim10_codes(codes)
assert len(out) == 3
assert out[0]["code"] == "K650"
def test_ccam_rejeté(self):
"""Un code CCAM (4 lettres + 3 chiffres) ne doit pas passer le filtre CIM-10."""
codes = [
{"code": "K650", "position": ""},
{"code": "EBFA012", "position": "1"}, # CCAM
]
out = filter_cim10_codes(codes)
assert len(out) == 1
assert out[0]["code"] == "K650"
def test_code_vide_rejeté(self):
codes = [{"code": "", "position": ""}, {"code": "K650", "position": ""}]
out = filter_cim10_codes(codes)
assert len(out) == 1
def test_non_dict_ignoré(self):
codes = ["K650", None, {"code": "T814", "position": ""}]
out = filter_cim10_codes(codes)
assert len(out) == 1
def test_liste_vide(self):
assert filter_cim10_codes([]) == []
assert filter_cim10_codes(None) == []
class TestClassifyCodesDpDrDas:
def test_cas_nominal(self):
"""1er sans position = DP, 2e sans position = DR, puis DAS avec positions."""
codes = [
{"code": "K650", "position": ""},
{"code": "T814", "position": ""},
{"code": "Z954", "position": "2"},
{"code": "R33", "position": "3"},
]
dp, dr, das = classify_codes_dp_dr_das(codes)
assert dp == "K650"
assert dr == "T814"
assert [d["code"] for d in das] == ["Z954", "R33"]
def test_dr_vide_non_duplique_dp(self):
"""Quand Qwen duplique le DP (parce que DR est visuellement vide),
on doit considérer que DR est vide, pas DR = DP."""
codes = [
{"code": "K650", "position": ""},
{"code": "K650", "position": ""}, # doublon
{"code": "T814", "position": "2"},
]
dp, dr, das = classify_codes_dp_dr_das(codes)
assert dp == "K650"
assert dr == "" # dédupliqué
assert len(das) == 1
def test_seulement_dp(self):
codes = [{"code": "K650", "position": ""}]
dp, dr, das = classify_codes_dp_dr_das(codes)
assert dp == "K650"
assert dr == ""
assert das == []
def test_tous_avec_positions(self):
"""Si tous les codes ont une position, DP et DR sont vides, tout en DAS."""
codes = [
{"code": "K650", "position": "1"},
{"code": "T814", "position": "2"},
]
dp, dr, das = classify_codes_dp_dr_das(codes)
assert dp == ""
assert dr == ""
assert len(das) == 2
def test_vide(self):
dp, dr, das = classify_codes_dp_dr_das([])
assert (dp, dr, das) == ("", "", [])
class TestMergeCodageReco:
def test_crop_prime_sur_passage_principal(self):
parsed = {"codage_reco": {"dp": "", "dr": "", "das": []}}
reco = {"dp": "K650", "dr": "T814",
"das": [{"code": "Z954", "position": "2"}]}
merge_codage_reco(parsed, reco)
assert parsed["codage_reco"]["dp"] == "K650"
assert parsed["codage_reco"]["dr"] == "T814"
assert len(parsed["codage_reco"]["das"]) == 1
def test_crop_vide_garde_passage_principal(self):
"""Si le crop a un champ vide mais le passage principal l'avait rempli,
on ne dégrade pas : on garde le passage principal."""
parsed = {"codage_reco": {"dp": "K650", "dr": "", "das": []}}
reco = {"dp": "", "dr": "", "das": []}
merge_codage_reco(parsed, reco)
assert parsed["codage_reco"]["dp"] == "K650" # préservé
def test_codage_reco_initialement_absent(self):
parsed = {}
reco = {"dp": "K650", "dr": "", "das": []}
merge_codage_reco(parsed, reco)
assert parsed["codage_reco"]["dp"] == "K650"
def test_trace_crop_ajoutee(self):
parsed = {"codage_reco": {"dp": "", "dr": "", "das": []}}
reco = {"dp": "K650", "_elapsed_s": 1.5}
merge_codage_reco(parsed, reco)
assert parsed["_crop_recodage"]["result"]["_elapsed_s"] == 1.5
class TestResolveRecueilZones:
def test_fallback_constantes(self):
"""Sans config utilisateur, on a les zones par défaut."""
reco, cb = resolve_recueil_zones()
# 4 coords flottantes
assert len(reco) == 4
assert all(isinstance(v, float) for v in reco)
# Checkbox zones
assert len(cb.accord) == 4
assert len(cb.desaccord) == 4

118
tests/test_schema.py Normal file
View File

@@ -0,0 +1,118 @@
"""Tests unitaires pour pipeline.schema (nettoyage JSON)."""
from __future__ import annotations
from pipeline.schema import (
CLEAN_FIELDS_RECUEIL,
DEBUG_FIELDS,
SCHEMA_VERSION,
clean_dossier,
)
def _sample_raw():
"""Un JSON pipeline type, riche en champs debug."""
return {
"fichier": "OGC 7",
"pdf_hash": "abc123",
"pages": [{"page": 1, "type": "recueil"}],
"extraction": {
"recueil": {
"etablissement": "CLINIQUE X",
"finess": "330780206",
"ghm_etab": "11M122",
"ghs_etab": "4323",
"codage_etab": {"dp": "K650"},
"accord_desaccord": "accord",
"_checkbox_debug": {"ratio_accord": 0.38, "ratio_desaccord": 0.19},
"_parse_error": "whatever",
"_truncated_loop": True,
"_crop_recodage": {"dp": "K650", "_source": "crop"},
"_validation": {
"summary": {"valid": 3, "invalid": 0, "empty": 2, "total_codes": 3},
"cross_checks": {
"etab": {"checked": True, "coherent": True},
"reco": {"checked": False, "reason": "ghm manquant"},
},
"codage_etab": {
"dp": {"code": "K650", "valid": True, "libelle_ref": "Péritonite"},
"dr": {"code": "", "valid": None},
"das": [],
},
"codage_reco": {"dp": {}, "dr": {}, "das": []},
"ghm_etab": {"code": "11M122", "valid": True,
"ghs_possibles": ["4323"]},
"ghs_etab": {"code": "4323", "valid": True},
"ghm_reco": {"code": "", "valid": None},
"ghs_reco": {"code": "", "valid": None},
},
},
"concertation_2": {
"ghs_initial": "4323",
"ghs_final": "4323",
"decision": "retour_groupage_dim",
"date_concertation": "13/03/2018",
},
},
"_meta": {"pipeline_version": "v2", "ocr_model": "Qwen/Qwen2.5-VL-3B-Instruct"},
}
class TestCleanDossier:
def test_retourne_schema_version(self):
out = clean_dossier(_sample_raw())
assert out["schema_version"] == SCHEMA_VERSION
def test_retire_tous_les_champs_debug(self):
"""Aucun champ de DEBUG_FIELDS ne doit rester dans la sortie clean."""
out = clean_dossier(_sample_raw())
rec = out["extraction"]["recueil"]
for debug_field in DEBUG_FIELDS:
assert debug_field not in rec, \
f"{debug_field} devrait être retiré"
def test_garde_les_champs_metier(self):
out = clean_dossier(_sample_raw())
rec = out["extraction"]["recueil"]
for f in ["etablissement", "finess", "ghm_etab", "ghs_etab",
"codage_etab", "accord_desaccord"]:
assert f in rec, f"{f} doit être présent dans clean"
def test_validation_compactee(self):
"""La validation est conservée mais en format compact."""
out = clean_dossier(_sample_raw())
v = out["extraction"]["recueil"]["_validation"]
# summary garde tel quel
assert v["summary"]["valid"] == 3
# cross_checks compactés : juste le coherent booléen (ou None)
assert v["cross_checks"] == {
"etab_ghm_ghs_coherent": True,
"reco_ghm_ghs_coherent": None,
}
# Les codes validés gardent libelle_ref quand dispo
assert v["codage_etab"]["dp"]["valid"] is True
assert v["codage_etab"]["dp"].get("libelle_ref") == "Péritonite"
def test_concertation_2_conservee(self):
out = clean_dossier(_sample_raw())
c2 = out["extraction"]["concertation_2"]
assert c2["ghs_initial"] == "4323"
assert c2["decision"] == "retour_groupage_dim"
def test_champs_inconnus_ignorés(self):
"""Un champ qui n'est pas dans CLEAN_FIELDS_RECUEIL est retiré."""
raw = _sample_raw()
raw["extraction"]["recueil"]["champ_inventé"] = "poubelle"
out = clean_dossier(raw)
assert "champ_inventé" not in out["extraction"]["recueil"]
def test_meta_preservee(self):
out = clean_dossier(_sample_raw())
assert out["_meta"]["pipeline_version"] == "v2"
assert "Qwen" in out["_meta"]["ocr_model"]
def test_pas_de_modification_input(self):
"""La fonction ne doit pas modifier l'input."""
raw = _sample_raw()
before = raw["extraction"]["recueil"].copy()
_ = clean_dossier(raw)
assert raw["extraction"]["recueil"] == before

146
tests/test_validation.py Normal file
View File

@@ -0,0 +1,146 @@
"""Tests unitaires pour pipeline.validation."""
from __future__ import annotations
import pytest
from pipeline.validation import (
_check_ccam,
_check_cim10,
_check_ghm,
_check_ghs,
_cross_check_ghm_ghs,
annotate,
validate_recueil,
)
# ============================================================
# Vérifications par type de code
# ============================================================
class TestCheckCim10:
def test_code_valide(self):
r = _check_cim10("K650")
assert r["valid"] is True
assert "libelle_ref" in r
def test_code_vide(self):
assert _check_cim10("")["valid"] is None
assert _check_cim10(None)["valid"] is None
def test_code_avec_suffixe_pmsi(self):
# Les suffixes * et +N sont gérés par la normalisation
r = _check_cim10("C795 *")
assert r["valid"] is True
def test_code_invalide_avec_suggestion(self):
# K65O (O au lieu de 0) n'existe pas, mais K650 oui
r = _check_cim10("K65O")
assert r["valid"] is False
assert r.get("suggestion") == "K650"
def test_code_invalide_sans_suggestion(self):
# Code farfelu sans voisin proche
r = _check_cim10("ZZZZ9999")
assert r["valid"] is False
# suggestion peut être absente
assert r.get("suggestion") is None or r.get("suggestion") != "ZZZZ9999"
class TestCheckGhm:
def test_ghm_valide(self):
r = _check_ghm("11M122")
assert r["valid"] is True
assert isinstance(r.get("ghs_possibles"), list)
assert len(r["ghs_possibles"]) > 0
def test_ghm_invalide(self):
r = _check_ghm("99Z999")
assert r["valid"] is False
class TestCheckGhs:
def test_ghs_valide(self):
assert _check_ghs("4323")["valid"] is True
def test_ghs_invalide(self):
assert _check_ghs("99999")["valid"] is False
class TestCheckCcam:
def test_ccam_valide(self):
assert _check_ccam("EBFA012")["valid"] is True
def test_ccam_invalide(self):
assert _check_ccam("XXXX000")["valid"] is False
# ============================================================
# Cross-checks GHM ↔ GHS
# ============================================================
class TestCrossCheckGhmGhs:
def test_couple_coherent(self):
# 11M122 a bien 4323 dans ses GHS possibles
r = _cross_check_ghm_ghs("11M122", "4323")
assert r["checked"] is True
assert r["coherent"] is True
def test_couple_incoherent(self):
# 11M122 ne correspond pas à n'importe quel GHS
r = _cross_check_ghm_ghs("11M122", "9999")
assert r["checked"] is True
assert r["coherent"] is False
def test_ghm_manquant(self):
r = _cross_check_ghm_ghs("", "4323")
assert r["checked"] is False
def test_ghm_invalide(self):
r = _cross_check_ghm_ghs("99Z999", "4323")
assert r["checked"] is False
assert "invalide" in r["reason"].lower()
# ============================================================
# annotate (intégration)
# ============================================================
class TestAnnotate:
def test_annotate_json_vide(self):
out = annotate({"fichier": "TEST", "extraction": {}})
assert "fichier" in out
assert out["extraction"] == {}
def test_annotate_recueil_complet(self):
raw = {
"fichier": "TEST",
"extraction": {
"recueil": {
"codage_etab": {"dp": "K650", "dr": "", "das": [
{"code": "T814", "position": "2"},
]},
"codage_reco": {"dp": "", "dr": "", "das": []},
"ghm_etab": "11M122",
"ghs_etab": "4323",
"ghm_reco": "",
"ghs_reco": "",
},
},
}
out = annotate(raw)
v = out["extraction"]["recueil"]["_validation"]
assert v["codage_etab"]["dp"]["valid"] is True
assert v["ghm_etab"]["valid"] is True
assert v["cross_checks"]["etab"]["coherent"] is True
assert v["summary"]["valid"] >= 3
def test_annotate_preserve_source(self):
"""L'annotation ne doit pas modifier l'input (copie défensive)."""
raw = {
"fichier": "T",
"extraction": {"recueil": {"codage_etab": {"dp": "K650"}}},
}
out = annotate(raw)
assert "_validation" not in raw["extraction"]["recueil"]
assert "_validation" in out["extraction"]["recueil"]

View File

@@ -0,0 +1,85 @@
"""Tests unitaires pour pipeline.zones_config."""
from __future__ import annotations
import json
import pytest
from pipeline.zones_config import (
DEFAULTS,
get_zone,
load_config,
save_config,
)
class TestLoadConfig:
def test_fichier_absent_retourne_defaults(self, tmp_path):
cfg = load_config(tmp_path / "inexistant.json")
assert cfg == DEFAULTS
def test_charge_depuis_fichier(self, tmp_path):
path = tmp_path / "zones.json"
custom = {
"recueil": {
"codage_reco": {"x1": 0.5, "y1": 0.1, "x2": 0.9, "y2": 0.4,
"description": "test"},
},
}
path.write_text(json.dumps(custom))
cfg = load_config(path)
assert cfg["recueil"]["codage_reco"]["x1"] == 0.5
def test_merge_avec_defaults(self, tmp_path):
"""Les zones non définies dans le fichier tombent en défaut."""
path = tmp_path / "zones.json"
partial = {
"recueil": {"codage_reco": {"x1": 0.1, "y1": 0.2, "x2": 0.3, "y2": 0.4}},
}
path.write_text(json.dumps(partial))
cfg = load_config(path)
# User override appliqué
assert cfg["recueil"]["codage_reco"]["x1"] == 0.1
# Default gardé pour l'autre zone
assert cfg["recueil"]["accord_checkbox"] == DEFAULTS["recueil"]["accord_checkbox"]
def test_json_corrompu_retombe_sur_defaults(self, tmp_path):
path = tmp_path / "corrupt.json"
path.write_text("{ not valid json [")
cfg = load_config(path)
assert cfg == DEFAULTS
class TestSaveConfig:
def test_save_puis_load_round_trip(self, tmp_path):
path = tmp_path / "zones.json"
original = {
"recueil": {
"codage_reco": {"x1": 0.11, "y1": 0.22, "x2": 0.33, "y2": 0.44,
"description": "abc"},
},
}
save_config(original, path)
reloaded = load_config(path)
assert reloaded["recueil"]["codage_reco"]["x1"] == 0.11
assert reloaded["recueil"]["codage_reco"]["description"] == "abc"
class TestGetZone:
def test_zone_existante(self):
z = get_zone("recueil", "codage_reco")
assert isinstance(z, tuple)
assert len(z) == 4
assert all(isinstance(v, float) for v in z)
def test_zone_inconnue_retourne_none(self):
assert get_zone("recueil", "zone_qui_nexiste_pas") is None
assert get_zone("page_fantaisiste", "whatever") is None
def test_config_explicite(self):
cfg = {
"recueil": {
"my_zone": {"x1": 0.0, "y1": 0.0, "x2": 1.0, "y2": 1.0},
},
}
assert get_zone("recueil", "my_zone", config=cfg) == (0.0, 0.0, 1.0, 1.0)