From 28da29f52126e81b98d1504cc211cc746fe8f399 Mon Sep 17 00:00:00 2001 From: Domi31tls Date: Thu, 26 Feb 2026 23:13:20 +0100 Subject: [PATCH] =?UTF-8?q?Perf=20x56=20:=20parall=C3=A9lisation=20raster?= =?UTF-8?q?=20+=20d=C3=A9dup=20tokens=20vector=20(30min=20=E2=86=92=2032s?= =?UTF-8?q?=20sur=204=20PDFs)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rasterisation parallèle (ProcessPoolExecutor) : _rasterize_page worker par page - Déduplication tokens dans redact_pdf_vector : 401 hits → 28 tokens uniques par page - Séparation phase search / phase annotate pour éviter dégradation PyMuPDF - Déduplication tokens dans redact_pdf_raster (Phase 1) - Index by_page dict au lieu de filtrage linéaire par page - Ajout process_pdfs_batch() pour batch multi-PDF sans NER - Support OCR word map dans vector et raster (fallback PDFs scannés) Co-Authored-By: Claude Opus 4.6 --- anonymizer_core_refactored_onnx.py | 254 +++++++++++++++++++++-------- 1 file changed, 182 insertions(+), 72 deletions(-) diff --git a/anonymizer_core_refactored_onnx.py b/anonymizer_core_refactored_onnx.py index 1bcc3ed..801b6be 100644 --- a/anonymizer_core_refactored_onnx.py +++ b/anonymizer_core_refactored_onnx.py @@ -14,11 +14,17 @@ Dépendances : pdfplumber, pdfminer.six, pillow, pymupdf, pyyaml (optionnel), tr from __future__ import annotations import io import json +import os import re +from concurrent.futures import ProcessPoolExecutor from dataclasses import dataclass, field from pathlib import Path from typing import List, Dict, Tuple, Optional, Any +# {page_idx: [(word_text, x0_norm, y0_norm, x1_norm, y1_norm), ...]} +# Coordonnées normalisées 0→1 (format natif docTR word.geometry) +OcrWordMap = Dict[int, List[Tuple[str, float, float, float, float]]] + import pdfplumber from pdfminer.high_level import extract_text as pdfminer_extract_text from pdfminer.layout import LAParams @@ -524,9 +530,19 @@ def load_dictionaries(config_path: Optional[Path]) -> Dict[str, Any]: # ----------------- Extraction ----------------- -def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List[str]], bool]: +_doctr_model_cache = None + +def _get_doctr_model(): + global _doctr_model_cache + if _doctr_model_cache is None: + _doctr_model_cache = _doctr_ocr_predictor( + det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True + ) + return _doctr_model_cache + +def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List[str]], bool, OcrWordMap]: """Extraction texte multi-passes avec fallback OCR (docTR). - Retourne (pages_text, tables_lines, ocr_used). + Retourne (pages_text, tables_lines, ocr_used, ocr_word_map). """ pages_text: List[str] = [] tables_lines: List[List[str]] = [] @@ -568,34 +584,41 @@ def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List pass # 4e passe : OCR docTR si toujours très peu de texte (PDF scanné) total_chars = sum(len(x or "") for x in pages_text) + ocr_word_map: OcrWordMap = {} if total_chars < 200 and _DOCTR_AVAILABLE and fitz is not None: try: - model = _doctr_ocr_predictor(det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True) + model = _get_doctr_model() doc = fitz.open(str(pdf_path)) ocr_pages: List[str] = [] + import numpy as np for i in range(len(doc)): pix = doc[i].get_pixmap(dpi=300) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) - import numpy as np result = model([np.array(img)]) page_text = "" + page_words: List[Tuple[str, float, float, float, float]] = [] for block in result.pages[0].blocks: for line in block.lines: - words = [w.value for w in line.words] - page_text += " ".join(words) + "\n" + for w in line.words: + (x0, y0), (x1, y1) = w.geometry + page_words.append((w.value, x0, y0, x1, y1)) + page_text += " ".join(w.value for w in line.words) + "\n" + ocr_word_map[i] = page_words ocr_pages.append(page_text) doc.close() if sum(len(p) for p in ocr_pages) > total_chars: pages_text = ocr_pages ocr_used = True + else: + ocr_word_map = {} except Exception: - pass - return pages_text, tables_lines, ocr_used + ocr_word_map = {} + return pages_text, tables_lines, ocr_used, ocr_word_map # Alias pour compatibilité ascendante def extract_text_three_passes(pdf_path: Path): - pages_text, tables_lines, _ = extract_text_with_fallback_ocr(pdf_path) + pages_text, tables_lines, _, _ = extract_text_with_fallback_ocr(pdf_path) return pages_text, tables_lines # ----------------- Helpers ----------------- @@ -1368,6 +1391,26 @@ def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str: # ----------------- PDF Redaction ----------------- +def _search_ocr_words(ocr_words: List[Tuple[str, float, float, float, float]], token: str, page_rect) -> list: + """Cherche un token dans les mots OCR d'une page. + Pour les tokens multi-mots, cherche chaque mot individuellement. + Retourne des fitz.Rect en coordonnées PDF points.""" + rects = [] + tokens_to_search = token.split() if " " in token else [token] + for t in tokens_to_search: + t_lower = t.lower().strip() + if not t_lower: + continue + for (word, x0n, y0n, x1n, y1n) in ocr_words: + if word.lower().strip(".,;:!?()") == t_lower: + rects.append(fitz.Rect( + x0n * page_rect.width, + y0n * page_rect.height, + x1n * page_rect.width, + y1n * page_rect.height, + )) + return rects + def _search_whole_word(page, token: str) -> list: """Cherche un token comme mot entier (pas substring) via get_text('words'). Évite les faux positifs de page.search_for() qui fait du substring matching.""" @@ -1380,7 +1423,7 @@ def _search_whole_word(page, token: str) -> list: rects.append(fitz.Rect(w[0], w[1], w[2], w[3])) return rects -def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> None: +def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, ocr_word_map: OcrWordMap = None) -> None: if fitz is None: raise RuntimeError("PyMuPDF non disponible – installez pymupdf.") doc = fitz.open(str(original_pdf)) @@ -1399,27 +1442,32 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> hits = by_page.get(pno, []) + by_page.get(-1, []) if not hits: continue + # Dédupliquer les tokens : (token, kind) → rechercher une seule fois par page + seen_tokens: set = set() + all_rects = [] for h in hits: token = h.original.strip() if not token: continue - # Sauter toutes les dates EDS (masquées dans le texte, pas dans le PDF) if h.kind in _VECTOR_SKIP_KINDS: continue - # Tokens NOM courts (< 5 chars) : matching par mots entiers pour éviter - # les faux positifs substring ("AXa" dans "laxatifs", "SER" dans "Observations") + # Clé de déduplication : le token lui-même (même token cherché une seule fois) + dedup_key = token + if dedup_key in seen_tokens: + continue + seen_tokens.add(dedup_key) if h.kind in _VECTOR_SHORT_TOKEN_KINDS and len(token) < 5: if token.lower() not in _MEDICAL_STOP_WORDS_SET: rects = _search_whole_word(page, token) - for r in rects: - page.add_redact_annot(r, fill=(0,0,0)) + if not rects and ocr_word_map and pno in ocr_word_map: + rects = _search_ocr_words(ocr_word_map[pno], token, page.rect) + all_rects.extend(rects) continue rects = page.search_for(token) if not rects and h.kind in {"NIR", "IBAN", "TEL"}: compact = re.sub(r"\s+", "", token) if compact != token: rects = page.search_for(compact) - # Fallback : chercher chaque mot individuellement (uniquement pour les NOM) if not rects and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}: for word in token.split(): word = word.strip(" .-'") @@ -1428,8 +1476,12 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> if not word[0].isupper(): continue rects.extend(page.search_for(word)) - for r in rects: - page.add_redact_annot(r, fill=(0,0,0)) + if not rects and ocr_word_map and pno in ocr_word_map: + rects = _search_ocr_words(ocr_word_map[pno], token, page.rect) + all_rects.extend(rects) + # Appliquer toutes les annotations d'un coup (évite de ralentir search_for) + for r in all_rects: + page.add_redact_annot(r, fill=(0, 0, 0)) try: page.apply_redactions() except Exception: @@ -1438,84 +1490,116 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> doc.close() -def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 300, ogc_label: Optional[str] = None) -> None: +def _rasterize_page(args): + """Worker parallèle : rasterise une page + dessine les rectangles noirs.""" + pdf_path_str, pno, rects_tuples, dpi, ogc_label = args + doc = fitz.open(pdf_path_str) + src = doc[pno] + rect_w, rect_h = src.rect.width, src.rect.height + zoom = dpi / 72.0 + pix = src.get_pixmap(matrix=fitz.Matrix(zoom, zoom), annots=False) + img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) + draw = ImageDraw.Draw(img) + shrink = 1.5 + for (x0, y0, x1, y1) in rects_tuples: + rx0 = x0 * zoom + shrink + ry0 = y0 * zoom + rx1 = x1 * zoom - shrink + ry1 = y1 * zoom + if rx1 > rx0: + draw.rectangle([rx0, ry0, rx1, ry1], fill=(0, 0, 0)) + if ogc_label: + from PIL import ImageFont + font_size = int(14 * zoom) + try: + font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) + except Exception: + font = ImageFont.load_default() + text = f"OGC: {ogc_label}" + bbox = draw.textbbox((0, 0), text, font=font) + tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] + margin = int(10 * zoom) + x = img.width - tw - margin + y = margin + draw.rectangle([x - 4, y - 2, x + tw + 4, y + th + 2], fill=(255, 255, 255)) + draw.text((x, y), text, fill=(0, 0, 0), font=font) + buf = io.BytesIO() + img.save(buf, format="PNG") + doc.close() + return pno, buf.getvalue(), rect_w, rect_h + + +def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 300, ogc_label: Optional[str] = None, ocr_word_map: OcrWordMap = None) -> None: if fitz is None: raise RuntimeError("PyMuPDF non disponible – installez pymupdf.") - doc = fitz.open(str(original_pdf)); out = fitz.open() + doc = fitz.open(str(original_pdf)) all_rects: Dict[int, List["fitz.Rect"]] = {} + _RASTER_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL"} + _RASTER_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM", + "EDS_HOPITAL", "EDS_VILLE", "ETAB", "ETAB_GLOBAL"} + by_page: Dict[int, List[PiiHit]] = {} + for h in audit: + by_page.setdefault(h.page, []).append(h) for pno in range(len(doc)): page = doc[pno] rects = [] - _RASTER_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL"} - _RASTER_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM", - "EDS_HOPITAL", "EDS_VILLE", "ETAB", "ETAB_GLOBAL"} - hits = [x for x in audit if x.page in {pno, -1}] + seen_tokens: set = set() + hits = by_page.get(pno, []) + by_page.get(-1, []) for h in hits: token = h.original.strip() - if not token: continue - # Sauter toutes les dates EDS (masquées dans le texte, pas dans le PDF) - if h.kind in _RASTER_SKIP_KINDS: + if not token or h.kind in _RASTER_SKIP_KINDS: continue - # Tokens NOM courts (< 5 chars) : matching par mots entiers pour éviter - # les faux positifs substring ("AXa" dans "laxatifs", "SER" dans "Observations") + if token in seen_tokens: + continue + seen_tokens.add(token) if h.kind in _RASTER_SHORT_TOKEN_KINDS and len(token) < 5: if token.lower() not in _MEDICAL_STOP_WORDS_SET: - rects.extend(_search_whole_word(page, token)) + found_short = _search_whole_word(page, token) + if not found_short and ocr_word_map and pno in ocr_word_map: + found_short = _search_ocr_words(ocr_word_map[pno], token, page.rect) + rects.extend(found_short) continue found = page.search_for(token) if not found and h.kind in {"NIR", "IBAN", "TEL"}: compact = re.sub(r"\s+", "", token) found = page.search_for(compact) - # Fallback : si la chaîne complète n'est pas trouvée, - # chercher chaque mot individuellement (uniquement pour les NOM) if not found and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}: for word in token.split(): word = word.strip(" .-'") if len(word) < 5 or word.lower() in _MEDICAL_STOP_WORDS_SET: continue - # Ne garder que les mots qui ressemblent à des noms propres if not word[0].isupper(): continue found.extend(page.search_for(word)) + if not found and ocr_word_map and pno in ocr_word_map: + found = _search_ocr_words(ocr_word_map[pno], token, page.rect) rects.extend(found) all_rects[pno] = rects - for pno in range(len(doc)): - src = doc[pno]; rect = src.rect - zoom = dpi / 72.0; mat = fitz.Matrix(zoom, zoom) - pix = src.get_pixmap(matrix=mat, annots=False) - img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) - draw = ImageDraw.Draw(img) - for r in all_rects.get(pno, []): - # Rétrécir légèrement les rectangles pour éviter le débordement sur le texte adjacent - shrink = 1.5 # pixels à retirer de chaque côté - x0 = r.x0 * zoom + shrink - y0 = r.y0 * zoom - x1 = r.x1 * zoom - shrink - y1 = r.y1 * zoom - if x1 > x0: - draw.rectangle([x0, y0, x1, y1], fill=(0, 0, 0)) - # Incrustation OGC en haut à droite - if ogc_label: - from PIL import ImageFont - font_size = int(14 * zoom) - try: - font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) - except Exception: - font = ImageFont.load_default() - text = f"OGC: {ogc_label}" - bbox = draw.textbbox((0, 0), text, font=font) - tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] - margin = int(10 * zoom) - x = img.width - tw - margin - y = margin - # Fond blanc + texte noir - draw.rectangle([x - 4, y - 2, x + tw + 4, y + th + 2], fill=(255, 255, 255)) - draw.text((x, y), text, fill=(0, 0, 0), font=font) - buf = io.BytesIO(); img.save(buf, format="PNG"); buf.seek(0) - dst = out.new_page(width=rect.width, height=rect.height) - dst.insert_image(rect, stream=buf.getvalue()) + + # Phase 2 : rasterisation parallèle (ProcessPoolExecutor) + n_pages = len(doc) + rects_as_tuples = { + pno: [(r.x0, r.y0, r.x1, r.y1) for r in rects] + for pno, rects in all_rects.items() + } + doc.close() # fermer AVANT le fork + + n_workers = min(n_pages, os.cpu_count() or 4) + tasks = [ + (str(original_pdf), pno, rects_as_tuples.get(pno, []), dpi, ogc_label) + for pno in range(n_pages) + ] + + with ProcessPoolExecutor(max_workers=n_workers) as pool: + results = sorted(pool.map(_rasterize_page, tasks), key=lambda x: x[0]) + + # Assemblage final (séquentiel, rapide) + out = fitz.open() + for pno, png_bytes, w, h in results: + dst = out.new_page(width=w, height=h) + dst.insert_image(fitz.Rect(0, 0, w, h), stream=png_bytes) out.save(str(out_pdf), deflate=True, garbage=4, clean=True) - out.close(); doc.close() + out.close() # ----------------- Orchestration ----------------- @@ -1532,7 +1616,7 @@ def process_pdf( ) -> Dict[str, str]: out_dir.mkdir(parents=True, exist_ok=True) cfg = load_dictionaries(config_path) - pages_text, tables_lines, ocr_used = extract_text_with_fallback_ocr(pdf_path) + pages_text, tables_lines, ocr_used, ocr_word_map = extract_text_with_fallback_ocr(pdf_path) # 1) Regex rules anon = anonymise_document_regex(pages_text, tables_lines, cfg) @@ -1693,16 +1777,42 @@ def process_pdf( if make_vector_redaction and fitz is not None: vec_path = out_dir / f"{base}.redacted_vector.pdf" try: - redact_pdf_vector(pdf_path, anon.audit, vec_path) + redact_pdf_vector(pdf_path, anon.audit, vec_path, ocr_word_map=ocr_word_map) outputs["pdf_vector"] = str(vec_path) except Exception: pass if also_make_raster_burn and fitz is not None: ras_path = out_dir / f"{base}.redacted_raster.pdf" - redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label) + redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label, ocr_word_map=ocr_word_map) outputs["pdf_raster"] = str(ras_path) return outputs + +def process_pdfs_batch( + pdf_paths: List[Path], + out_dir: Path, + max_workers: int = None, + **kwargs, +) -> List[Dict[str, str]]: + """Traite plusieurs PDFs en parallèle (ProcessPoolExecutor). + + Ne fonctionne que quand ner_manager=None (les modèles NER ne sont pas + picklables). Quand NER est actif, les PDFs restent séquentiels mais + bénéficient de la parallélisation page-level de redact_pdf_raster(). + """ + if not pdf_paths: + return [] + if max_workers is None: + max_workers = min(len(pdf_paths), os.cpu_count() or 4) + out_dir.mkdir(parents=True, exist_ok=True) + + def _one(pdf_path): + return process_pdf(pdf_path, out_dir, **kwargs) + + with ProcessPoolExecutor(max_workers=max_workers) as pool: + return list(pool.map(_one, pdf_paths)) + + if __name__ == "__main__": import argparse ap = argparse.ArgumentParser(description="Anonymiser PDF (regex + NER ONNX optionnel)")