Perf x56 : parallélisation raster + dédup tokens vector (30min → 32s sur 4 PDFs)

- Rasterisation parallèle (ProcessPoolExecutor) : _rasterize_page worker par page
- Déduplication tokens dans redact_pdf_vector : 401 hits → 28 tokens uniques par page
- Séparation phase search / phase annotate pour éviter dégradation PyMuPDF
- Déduplication tokens dans redact_pdf_raster (Phase 1)
- Index by_page dict au lieu de filtrage linéaire par page
- Ajout process_pdfs_batch() pour batch multi-PDF sans NER
- Support OCR word map dans vector et raster (fallback PDFs scannés)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-26 23:13:20 +01:00
parent ac62a722bb
commit 28da29f521

View File

@@ -14,11 +14,17 @@ Dépendances : pdfplumber, pdfminer.six, pillow, pymupdf, pyyaml (optionnel), tr
from __future__ import annotations
import io
import json
import os
import re
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Any
# {page_idx: [(word_text, x0_norm, y0_norm, x1_norm, y1_norm), ...]}
# Coordonnées normalisées 0→1 (format natif docTR word.geometry)
OcrWordMap = Dict[int, List[Tuple[str, float, float, float, float]]]
import pdfplumber
from pdfminer.high_level import extract_text as pdfminer_extract_text
from pdfminer.layout import LAParams
@@ -524,9 +530,19 @@ def load_dictionaries(config_path: Optional[Path]) -> Dict[str, Any]:
# ----------------- Extraction -----------------
def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List[str]], bool]:
_doctr_model_cache = None
def _get_doctr_model():
global _doctr_model_cache
if _doctr_model_cache is None:
_doctr_model_cache = _doctr_ocr_predictor(
det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True
)
return _doctr_model_cache
def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List[str]], bool, OcrWordMap]:
"""Extraction texte multi-passes avec fallback OCR (docTR).
Retourne (pages_text, tables_lines, ocr_used).
Retourne (pages_text, tables_lines, ocr_used, ocr_word_map).
"""
pages_text: List[str] = []
tables_lines: List[List[str]] = []
@@ -568,34 +584,41 @@ def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List
pass
# 4e passe : OCR docTR si toujours très peu de texte (PDF scanné)
total_chars = sum(len(x or "") for x in pages_text)
ocr_word_map: OcrWordMap = {}
if total_chars < 200 and _DOCTR_AVAILABLE and fitz is not None:
try:
model = _doctr_ocr_predictor(det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True)
model = _get_doctr_model()
doc = fitz.open(str(pdf_path))
ocr_pages: List[str] = []
import numpy as np
for i in range(len(doc)):
pix = doc[i].get_pixmap(dpi=300)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
import numpy as np
result = model([np.array(img)])
page_text = ""
page_words: List[Tuple[str, float, float, float, float]] = []
for block in result.pages[0].blocks:
for line in block.lines:
words = [w.value for w in line.words]
page_text += " ".join(words) + "\n"
for w in line.words:
(x0, y0), (x1, y1) = w.geometry
page_words.append((w.value, x0, y0, x1, y1))
page_text += " ".join(w.value for w in line.words) + "\n"
ocr_word_map[i] = page_words
ocr_pages.append(page_text)
doc.close()
if sum(len(p) for p in ocr_pages) > total_chars:
pages_text = ocr_pages
ocr_used = True
else:
ocr_word_map = {}
except Exception:
pass
return pages_text, tables_lines, ocr_used
ocr_word_map = {}
return pages_text, tables_lines, ocr_used, ocr_word_map
# Alias pour compatibilité ascendante
def extract_text_three_passes(pdf_path: Path):
pages_text, tables_lines, _ = extract_text_with_fallback_ocr(pdf_path)
pages_text, tables_lines, _, _ = extract_text_with_fallback_ocr(pdf_path)
return pages_text, tables_lines
# ----------------- Helpers -----------------
@@ -1368,6 +1391,26 @@ def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str:
# ----------------- PDF Redaction -----------------
def _search_ocr_words(ocr_words: List[Tuple[str, float, float, float, float]], token: str, page_rect) -> list:
"""Cherche un token dans les mots OCR d'une page.
Pour les tokens multi-mots, cherche chaque mot individuellement.
Retourne des fitz.Rect en coordonnées PDF points."""
rects = []
tokens_to_search = token.split() if " " in token else [token]
for t in tokens_to_search:
t_lower = t.lower().strip()
if not t_lower:
continue
for (word, x0n, y0n, x1n, y1n) in ocr_words:
if word.lower().strip(".,;:!?()") == t_lower:
rects.append(fitz.Rect(
x0n * page_rect.width,
y0n * page_rect.height,
x1n * page_rect.width,
y1n * page_rect.height,
))
return rects
def _search_whole_word(page, token: str) -> list:
"""Cherche un token comme mot entier (pas substring) via get_text('words').
Évite les faux positifs de page.search_for() qui fait du substring matching."""
@@ -1380,7 +1423,7 @@ def _search_whole_word(page, token: str) -> list:
rects.append(fitz.Rect(w[0], w[1], w[2], w[3]))
return rects
def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> None:
def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, ocr_word_map: OcrWordMap = None) -> None:
if fitz is None:
raise RuntimeError("PyMuPDF non disponible installez pymupdf.")
doc = fitz.open(str(original_pdf))
@@ -1399,27 +1442,32 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) ->
hits = by_page.get(pno, []) + by_page.get(-1, [])
if not hits:
continue
# Dédupliquer les tokens : (token, kind) → rechercher une seule fois par page
seen_tokens: set = set()
all_rects = []
for h in hits:
token = h.original.strip()
if not token:
continue
# Sauter toutes les dates EDS (masquées dans le texte, pas dans le PDF)
if h.kind in _VECTOR_SKIP_KINDS:
continue
# Tokens NOM courts (< 5 chars) : matching par mots entiers pour éviter
# les faux positifs substring ("AXa" dans "laxatifs", "SER" dans "Observations")
# Clé de déduplication : le token lui-même (même token cherché une seule fois)
dedup_key = token
if dedup_key in seen_tokens:
continue
seen_tokens.add(dedup_key)
if h.kind in _VECTOR_SHORT_TOKEN_KINDS and len(token) < 5:
if token.lower() not in _MEDICAL_STOP_WORDS_SET:
rects = _search_whole_word(page, token)
for r in rects:
page.add_redact_annot(r, fill=(0,0,0))
if not rects and ocr_word_map and pno in ocr_word_map:
rects = _search_ocr_words(ocr_word_map[pno], token, page.rect)
all_rects.extend(rects)
continue
rects = page.search_for(token)
if not rects and h.kind in {"NIR", "IBAN", "TEL"}:
compact = re.sub(r"\s+", "", token)
if compact != token:
rects = page.search_for(compact)
# Fallback : chercher chaque mot individuellement (uniquement pour les NOM)
if not rects and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}:
for word in token.split():
word = word.strip(" .-'")
@@ -1428,8 +1476,12 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) ->
if not word[0].isupper():
continue
rects.extend(page.search_for(word))
for r in rects:
page.add_redact_annot(r, fill=(0,0,0))
if not rects and ocr_word_map and pno in ocr_word_map:
rects = _search_ocr_words(ocr_word_map[pno], token, page.rect)
all_rects.extend(rects)
# Appliquer toutes les annotations d'un coup (évite de ralentir search_for)
for r in all_rects:
page.add_redact_annot(r, fill=(0, 0, 0))
try:
page.apply_redactions()
except Exception:
@@ -1438,84 +1490,116 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) ->
doc.close()
def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 300, ogc_label: Optional[str] = None) -> None:
def _rasterize_page(args):
"""Worker parallèle : rasterise une page + dessine les rectangles noirs."""
pdf_path_str, pno, rects_tuples, dpi, ogc_label = args
doc = fitz.open(pdf_path_str)
src = doc[pno]
rect_w, rect_h = src.rect.width, src.rect.height
zoom = dpi / 72.0
pix = src.get_pixmap(matrix=fitz.Matrix(zoom, zoom), annots=False)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
draw = ImageDraw.Draw(img)
shrink = 1.5
for (x0, y0, x1, y1) in rects_tuples:
rx0 = x0 * zoom + shrink
ry0 = y0 * zoom
rx1 = x1 * zoom - shrink
ry1 = y1 * zoom
if rx1 > rx0:
draw.rectangle([rx0, ry0, rx1, ry1], fill=(0, 0, 0))
if ogc_label:
from PIL import ImageFont
font_size = int(14 * zoom)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size)
except Exception:
font = ImageFont.load_default()
text = f"OGC: {ogc_label}"
bbox = draw.textbbox((0, 0), text, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
margin = int(10 * zoom)
x = img.width - tw - margin
y = margin
draw.rectangle([x - 4, y - 2, x + tw + 4, y + th + 2], fill=(255, 255, 255))
draw.text((x, y), text, fill=(0, 0, 0), font=font)
buf = io.BytesIO()
img.save(buf, format="PNG")
doc.close()
return pno, buf.getvalue(), rect_w, rect_h
def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 300, ogc_label: Optional[str] = None, ocr_word_map: OcrWordMap = None) -> None:
if fitz is None:
raise RuntimeError("PyMuPDF non disponible installez pymupdf.")
doc = fitz.open(str(original_pdf)); out = fitz.open()
doc = fitz.open(str(original_pdf))
all_rects: Dict[int, List["fitz.Rect"]] = {}
_RASTER_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL"}
_RASTER_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM",
"EDS_HOPITAL", "EDS_VILLE", "ETAB", "ETAB_GLOBAL"}
by_page: Dict[int, List[PiiHit]] = {}
for h in audit:
by_page.setdefault(h.page, []).append(h)
for pno in range(len(doc)):
page = doc[pno]
rects = []
_RASTER_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL"}
_RASTER_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM",
"EDS_HOPITAL", "EDS_VILLE", "ETAB", "ETAB_GLOBAL"}
hits = [x for x in audit if x.page in {pno, -1}]
seen_tokens: set = set()
hits = by_page.get(pno, []) + by_page.get(-1, [])
for h in hits:
token = h.original.strip()
if not token: continue
# Sauter toutes les dates EDS (masquées dans le texte, pas dans le PDF)
if h.kind in _RASTER_SKIP_KINDS:
if not token or h.kind in _RASTER_SKIP_KINDS:
continue
# Tokens NOM courts (< 5 chars) : matching par mots entiers pour éviter
# les faux positifs substring ("AXa" dans "laxatifs", "SER" dans "Observations")
if token in seen_tokens:
continue
seen_tokens.add(token)
if h.kind in _RASTER_SHORT_TOKEN_KINDS and len(token) < 5:
if token.lower() not in _MEDICAL_STOP_WORDS_SET:
rects.extend(_search_whole_word(page, token))
found_short = _search_whole_word(page, token)
if not found_short and ocr_word_map and pno in ocr_word_map:
found_short = _search_ocr_words(ocr_word_map[pno], token, page.rect)
rects.extend(found_short)
continue
found = page.search_for(token)
if not found and h.kind in {"NIR", "IBAN", "TEL"}:
compact = re.sub(r"\s+", "", token)
found = page.search_for(compact)
# Fallback : si la chaîne complète n'est pas trouvée,
# chercher chaque mot individuellement (uniquement pour les NOM)
if not found and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}:
for word in token.split():
word = word.strip(" .-'")
if len(word) < 5 or word.lower() in _MEDICAL_STOP_WORDS_SET:
continue
# Ne garder que les mots qui ressemblent à des noms propres
if not word[0].isupper():
continue
found.extend(page.search_for(word))
if not found and ocr_word_map and pno in ocr_word_map:
found = _search_ocr_words(ocr_word_map[pno], token, page.rect)
rects.extend(found)
all_rects[pno] = rects
for pno in range(len(doc)):
src = doc[pno]; rect = src.rect
zoom = dpi / 72.0; mat = fitz.Matrix(zoom, zoom)
pix = src.get_pixmap(matrix=mat, annots=False)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
draw = ImageDraw.Draw(img)
for r in all_rects.get(pno, []):
# Rétrécir légèrement les rectangles pour éviter le débordement sur le texte adjacent
shrink = 1.5 # pixels à retirer de chaque côté
x0 = r.x0 * zoom + shrink
y0 = r.y0 * zoom
x1 = r.x1 * zoom - shrink
y1 = r.y1 * zoom
if x1 > x0:
draw.rectangle([x0, y0, x1, y1], fill=(0, 0, 0))
# Incrustation OGC en haut à droite
if ogc_label:
from PIL import ImageFont
font_size = int(14 * zoom)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size)
except Exception:
font = ImageFont.load_default()
text = f"OGC: {ogc_label}"
bbox = draw.textbbox((0, 0), text, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
margin = int(10 * zoom)
x = img.width - tw - margin
y = margin
# Fond blanc + texte noir
draw.rectangle([x - 4, y - 2, x + tw + 4, y + th + 2], fill=(255, 255, 255))
draw.text((x, y), text, fill=(0, 0, 0), font=font)
buf = io.BytesIO(); img.save(buf, format="PNG"); buf.seek(0)
dst = out.new_page(width=rect.width, height=rect.height)
dst.insert_image(rect, stream=buf.getvalue())
# Phase 2 : rasterisation parallèle (ProcessPoolExecutor)
n_pages = len(doc)
rects_as_tuples = {
pno: [(r.x0, r.y0, r.x1, r.y1) for r in rects]
for pno, rects in all_rects.items()
}
doc.close() # fermer AVANT le fork
n_workers = min(n_pages, os.cpu_count() or 4)
tasks = [
(str(original_pdf), pno, rects_as_tuples.get(pno, []), dpi, ogc_label)
for pno in range(n_pages)
]
with ProcessPoolExecutor(max_workers=n_workers) as pool:
results = sorted(pool.map(_rasterize_page, tasks), key=lambda x: x[0])
# Assemblage final (séquentiel, rapide)
out = fitz.open()
for pno, png_bytes, w, h in results:
dst = out.new_page(width=w, height=h)
dst.insert_image(fitz.Rect(0, 0, w, h), stream=png_bytes)
out.save(str(out_pdf), deflate=True, garbage=4, clean=True)
out.close(); doc.close()
out.close()
# ----------------- Orchestration -----------------
@@ -1532,7 +1616,7 @@ def process_pdf(
) -> Dict[str, str]:
out_dir.mkdir(parents=True, exist_ok=True)
cfg = load_dictionaries(config_path)
pages_text, tables_lines, ocr_used = extract_text_with_fallback_ocr(pdf_path)
pages_text, tables_lines, ocr_used, ocr_word_map = extract_text_with_fallback_ocr(pdf_path)
# 1) Regex rules
anon = anonymise_document_regex(pages_text, tables_lines, cfg)
@@ -1693,16 +1777,42 @@ def process_pdf(
if make_vector_redaction and fitz is not None:
vec_path = out_dir / f"{base}.redacted_vector.pdf"
try:
redact_pdf_vector(pdf_path, anon.audit, vec_path)
redact_pdf_vector(pdf_path, anon.audit, vec_path, ocr_word_map=ocr_word_map)
outputs["pdf_vector"] = str(vec_path)
except Exception:
pass
if also_make_raster_burn and fitz is not None:
ras_path = out_dir / f"{base}.redacted_raster.pdf"
redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label)
redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label, ocr_word_map=ocr_word_map)
outputs["pdf_raster"] = str(ras_path)
return outputs
def process_pdfs_batch(
pdf_paths: List[Path],
out_dir: Path,
max_workers: int = None,
**kwargs,
) -> List[Dict[str, str]]:
"""Traite plusieurs PDFs en parallèle (ProcessPoolExecutor).
Ne fonctionne que quand ner_manager=None (les modèles NER ne sont pas
picklables). Quand NER est actif, les PDFs restent séquentiels mais
bénéficient de la parallélisation page-level de redact_pdf_raster().
"""
if not pdf_paths:
return []
if max_workers is None:
max_workers = min(len(pdf_paths), os.cpu_count() or 4)
out_dir.mkdir(parents=True, exist_ok=True)
def _one(pdf_path):
return process_pdf(pdf_path, out_dir, **kwargs)
with ProcessPoolExecutor(max_workers=max_workers) as pool:
return list(pool.map(_one, pdf_paths))
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser(description="Anonymiser PDF (regex + NER ONNX optionnel)")