fix(perf): apply MVP threading hotfix
Configure numerical library and torch threading for H1, keep raster threading/timing instrumentation, remove CONCERTATION from forced masks after real PDF FP testing, and record coordination archive state.
This commit is contained in:
@@ -19,9 +19,20 @@ import os
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
from concurrent.futures import ProcessPoolExecutor
|
||||
import time
|
||||
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
|
||||
from datetime import datetime
|
||||
|
||||
# --- H1 perf (D-19) : usage multi-cœur des libs numériques en EXE frozen ---
|
||||
# En PyInstaller frozen, OpenMP/MKL/BLAS tombent souvent à 1 thread (CPU ~12 %).
|
||||
# Ces variables sont lues par numpy/torch/onnxruntime à leur init : elles doivent
|
||||
# donc être posées AVANT l'import de pdfplumber/PIL (numpy transitif) ci-dessous.
|
||||
# setdefault : on n'écrase jamais un réglage explicite posé par l'utilisateur/admin.
|
||||
_n_cpu_threads = str(os.cpu_count() or 4)
|
||||
for _env in ("OMP_NUM_THREADS", "MKL_NUM_THREADS", "OPENBLAS_NUM_THREADS",
|
||||
"NUMEXPR_NUM_THREADS", "VECLIB_MAXIMUM_THREADS"):
|
||||
os.environ.setdefault(_env, _n_cpu_threads)
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
@@ -64,6 +75,46 @@ except Exception:
|
||||
_doctr_ocr_predictor = None # type: ignore
|
||||
_DOCTR_AVAILABLE = False
|
||||
|
||||
_doctr_model_cache = None
|
||||
_TORCH_THREADS_CONFIGURED = False
|
||||
|
||||
def _configure_torch_threads():
|
||||
"""Configure les threads PyTorch pour exploiter tous les cœurs en mode frozen.
|
||||
|
||||
En EXE PyInstaller, torch ne configure pas ses threads par défaut et reste
|
||||
à 1 thread intra-op + 1 inter-op, ce qui limite l'OCR docTR et le NER
|
||||
à ~12 % CPU sur une machine 8 threads.
|
||||
|
||||
Idempotent : appelable depuis l'OCR (doc scanné) comme depuis le NER (doc
|
||||
natif sans OCR). `set_num_interop_threads` ne peut être posé qu'une seule
|
||||
fois avant tout travail parallèle ; le flag évite un 2e appel qui lèverait.
|
||||
"""
|
||||
global _TORCH_THREADS_CONFIGURED
|
||||
if _TORCH_THREADS_CONFIGURED:
|
||||
return
|
||||
try:
|
||||
import torch
|
||||
n_cpus = os.cpu_count() or 4
|
||||
torch.set_num_threads(n_cpus)
|
||||
try:
|
||||
torch.set_num_interop_threads(min(n_cpus, 8))
|
||||
except Exception:
|
||||
pass # inter-op déjà figé par un travail torch antérieur — non bloquant
|
||||
_TORCH_THREADS_CONFIGURED = True
|
||||
log.info("torch threads config: intra=%d inter=%d (CPUs=%d)",
|
||||
n_cpus, min(n_cpus, 8), os.cpu_count() or 0)
|
||||
except Exception as e:
|
||||
log.debug("torch threads config skipped: %s", e)
|
||||
|
||||
def _get_doctr_model():
|
||||
global _doctr_model_cache
|
||||
if _doctr_model_cache is None:
|
||||
_configure_torch_threads()
|
||||
_doctr_model_cache = _doctr_ocr_predictor(
|
||||
det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True
|
||||
)
|
||||
return _doctr_model_cache
|
||||
|
||||
try:
|
||||
from detectors.hospital_filter import HospitalFilter
|
||||
_HOSPITAL_FILTER_AVAILABLE = True
|
||||
@@ -1093,16 +1144,6 @@ def _apply_admin_identifier_hits(full_raw: str, audit: List["PiiHit"], cfg: Dict
|
||||
|
||||
# ----------------- Extraction -----------------
|
||||
|
||||
_doctr_model_cache = None
|
||||
|
||||
def _get_doctr_model():
|
||||
global _doctr_model_cache
|
||||
if _doctr_model_cache is None:
|
||||
_doctr_model_cache = _doctr_ocr_predictor(
|
||||
det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True
|
||||
)
|
||||
return _doctr_model_cache
|
||||
|
||||
def _extract_page_layout_aware(page) -> str:
|
||||
"""Extrait le texte d'une page PyMuPDF en gérant les layouts multi-colonnes.
|
||||
|
||||
@@ -1763,7 +1804,7 @@ def _kv_value_only_mask(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
|
||||
parts = SPLITTER.split(line, maxsplit=1)
|
||||
# Une ligne narrative qui se termine par ` ;` ou ` :` produit un split
|
||||
# avec une "value" vide. La "key" contient alors tout le narratif —
|
||||
# incluant d'éventuels force_term (`CHUXX`, `CONCERTATION`…) qui doivent
|
||||
# incluant d'éventuels force_term (`CHUXX`, sigle local...) qui doivent
|
||||
# être masqués. Idem si la "key" fait plus de 5 mots : c'est très
|
||||
# probablement du narratif, pas un libellé `Label : valeur`.
|
||||
if len(parts) == 2 and parts[1].strip() and len(parts[0].split()) <= 5:
|
||||
@@ -2977,6 +3018,11 @@ def _run_ner_on_original_text(
|
||||
Returns:
|
||||
Liste de NerDetection dédupliquée (par token+label+page+source).
|
||||
"""
|
||||
# H1 perf (D-19) : couvre le cas du PDF natif (texte riche, OCR sauté) où
|
||||
# _get_doctr_model() n'est jamais appelé ; les NER torch (EDS-Pseudo, GLiNER)
|
||||
# tourneraient alors mono-thread. Idempotent (no-op si déjà configuré par l'OCR).
|
||||
_configure_torch_threads()
|
||||
|
||||
detections: List[NerDetection] = []
|
||||
seen: set = set() # (token_lower, label, page_idx, source) pour dédoublonnage
|
||||
|
||||
@@ -4274,7 +4320,9 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp
|
||||
rects.extend(found)
|
||||
all_rects[pno] = rects
|
||||
|
||||
# Phase 2 : rasterisation parallèle (ProcessPoolExecutor)
|
||||
# Phase 2 : rasterisation parallèle (ProcessPoolExecutor hors EXE,
|
||||
# ThreadPoolExecutor en EXE PyInstaller pour éviter de relancer la GUI).
|
||||
raster_t0 = time.perf_counter()
|
||||
n_pages = len(doc)
|
||||
rects_as_tuples = {
|
||||
pno: [(r.x0, r.y0, r.x1, r.y1) for r in rects]
|
||||
@@ -4313,12 +4361,26 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp
|
||||
for pno in range(n_pages)
|
||||
]
|
||||
|
||||
# Mode frozen (PyInstaller --onefile) : ProcessPoolExecutor relance l'exe
|
||||
# et ouvre des fenêtres GUI fantômes → séquentiel obligatoire
|
||||
if getattr(sys, 'frozen', False) or n_pages <= 2:
|
||||
frozen = bool(getattr(sys, 'frozen', False))
|
||||
disable_threads = os.getenv("ANON_DISABLE_RASTER_THREADS", "").lower() in {"1", "true", "yes", "on"}
|
||||
if n_pages <= 2:
|
||||
log.info("Raster PDF: mode=sequential pages=%d dpi=%d reason=small_pdf", n_pages, dpi)
|
||||
results = sorted([_rasterize_page(t) for t in tasks], key=lambda x: x[0])
|
||||
elif frozen and not disable_threads:
|
||||
n_workers = min(n_pages, os.cpu_count() or 4)
|
||||
log.info("Raster PDF: mode=threads pages=%d workers=%d dpi=%d frozen=1", n_pages, n_workers, dpi)
|
||||
try:
|
||||
with ThreadPoolExecutor(max_workers=n_workers) as pool:
|
||||
results = sorted(pool.map(_rasterize_page, tasks), key=lambda x: x[0])
|
||||
except Exception as e:
|
||||
log.warning("Raster PDF threaded mode failed, fallback sequential: %s", e)
|
||||
results = sorted([_rasterize_page(t) for t in tasks], key=lambda x: x[0])
|
||||
elif frozen and disable_threads:
|
||||
log.info("Raster PDF: mode=sequential pages=%d dpi=%d frozen=1 reason=env_disabled", n_pages, dpi)
|
||||
results = sorted([_rasterize_page(t) for t in tasks], key=lambda x: x[0])
|
||||
else:
|
||||
n_workers = min(n_pages, os.cpu_count() or 4)
|
||||
log.info("Raster PDF: mode=processes pages=%d workers=%d dpi=%d frozen=0", n_pages, n_workers, dpi)
|
||||
with ProcessPoolExecutor(max_workers=n_workers) as pool:
|
||||
results = sorted(pool.map(_rasterize_page, tasks), key=lambda x: x[0])
|
||||
|
||||
@@ -4331,6 +4393,7 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp
|
||||
_apply_pseudo_xmp_metadata(out)
|
||||
out.save(str(out_pdf), deflate=True, garbage=4, clean=True)
|
||||
out.close()
|
||||
log.info("Raster PDF done: pages=%d output=%s duration=%.2fs", n_pages, out_pdf.name, time.perf_counter() - raster_t0)
|
||||
|
||||
# ----------------- VLM pour PDFs scannés -----------------
|
||||
|
||||
@@ -4424,15 +4487,31 @@ def process_pdf(
|
||||
camembert_manager=None,
|
||||
quarantine_mgr: Optional["QuarantineManager"] = None,
|
||||
) -> Dict[str, str]:
|
||||
perf_t0 = time.perf_counter()
|
||||
last_mark = perf_t0
|
||||
|
||||
def _perf_mark(stage: str) -> None:
|
||||
nonlocal last_mark
|
||||
now = time.perf_counter()
|
||||
log.info("PERF %s: stage=%s duration=%.2fs total=%.2fs",
|
||||
pdf_path.name, stage, now - last_mark, now - perf_t0)
|
||||
last_mark = now
|
||||
|
||||
log.info("PERF %s: start frozen=%s vector=%s raster=%s",
|
||||
pdf_path.name, bool(getattr(sys, "frozen", False)), make_vector_redaction, also_make_raster_burn)
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
cfg = load_dictionaries(config_path)
|
||||
_perf_mark("load_config")
|
||||
pages_text, tables_lines, ocr_used, ocr_word_map = extract_text_with_fallback_ocr(pdf_path)
|
||||
_perf_mark("extract_text_ocr")
|
||||
|
||||
# Q-1 B-3 : pré-flight texte vide. Si moins de SEUIL_TEXTE_MINI caractères
|
||||
# extraits, c'est probablement un scan non-OCRisé ou un document corrompu.
|
||||
# On NE traite PAS — quarantaine totale, le doc original est copié pour
|
||||
# ré-essai manuel.
|
||||
extracted_chars = sum(len(p) for p in pages_text)
|
||||
log.info("PERF %s: pages=%d extracted_chars=%d ocr_used=%s ocr_pages=%d",
|
||||
pdf_path.name, len(pages_text), extracted_chars, bool(ocr_used), len(ocr_word_map or {}))
|
||||
if extracted_chars < SEUIL_TEXTE_MINI:
|
||||
log.warning("Preflight failed for %s: only %d chars extracted (seuil=%d)",
|
||||
pdf_path.name, extracted_chars, SEUIL_TEXTE_MINI)
|
||||
@@ -4479,12 +4558,14 @@ def process_pdf(
|
||||
gliner_mgr=gliner_manager,
|
||||
camembert_mgr=camembert_manager,
|
||||
)
|
||||
_perf_mark("regex_rules")
|
||||
|
||||
# 1b) VLM (optionnel) — sur les PDFs scannés uniquement
|
||||
if ocr_used and vlm_manager is not None and VlmManager is not None:
|
||||
try:
|
||||
if vlm_manager.is_loaded():
|
||||
_apply_vlm_on_scanned_pdf(pdf_path, anon, ocr_word_map, vlm_manager)
|
||||
_perf_mark("vlm_scan")
|
||||
except Exception:
|
||||
pass # dégradation gracieuse
|
||||
|
||||
@@ -4498,9 +4579,11 @@ def process_pdf(
|
||||
else:
|
||||
final_text, hf_hits = apply_hf_ner_on_narrative(final_text, cfg, ner_manager, ner_thresholds)
|
||||
anon.audit.extend(hf_hits)
|
||||
_perf_mark("ner_optional")
|
||||
|
||||
# 3) Rescan selectif
|
||||
final_text = selective_rescan(final_text, cfg=cfg)
|
||||
_perf_mark("selective_rescan")
|
||||
|
||||
# 3a-bis) Nettoyage post-masquage : continuation orpheline d'un nom composé
|
||||
# coupé par saut de ligne. Cas Trackare en colonnes : "NOCENT-EJNAINI"
|
||||
@@ -4820,6 +4903,7 @@ def process_pdf(
|
||||
r"DOSSIER|NDA|EPISODE|RPPS|DATE_NAISSANCE|AGE|NIR|IBAN|OGC)\])\]+"
|
||||
)
|
||||
final_text = _RE_BRACKET_CLEAN.sub(r"\1", final_text)
|
||||
_perf_mark("post_cleaning")
|
||||
|
||||
# 6) Whitelist absolue : filtrer les hits qui matchent un terme whitelist
|
||||
# de la GUI (clé YAML whitelist_phrases). Filet de sécurité après tous les
|
||||
@@ -4960,6 +5044,7 @@ def process_pdf(
|
||||
for hit in audit_for_file:
|
||||
f.write(json.dumps(hit.__dict__, ensure_ascii=False) + "\n")
|
||||
outputs = {"text": str(txt_path), "audit": str(audit_path)}
|
||||
_perf_mark("write_text_audit")
|
||||
|
||||
# PDFs
|
||||
if make_vector_redaction and fitz is not None:
|
||||
@@ -4967,6 +5052,7 @@ def process_pdf(
|
||||
try:
|
||||
redact_pdf_vector(pdf_path, anon.audit, vec_path, ocr_word_map=ocr_word_map)
|
||||
outputs["pdf_vector"] = str(vec_path)
|
||||
_perf_mark("pdf_vector")
|
||||
except Exception as e:
|
||||
# Q-1 D2/D3 : ne plus avaler silencieusement. Le texte (.pseudonymise.txt)
|
||||
# est déjà sorti avant ce bloc.
|
||||
@@ -5023,6 +5109,8 @@ def process_pdf(
|
||||
ras_path = out_dir / f"{base}.redacted_raster.pdf"
|
||||
redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label, ocr_word_map=ocr_word_map)
|
||||
outputs["pdf_raster"] = str(ras_path)
|
||||
_perf_mark("pdf_raster")
|
||||
log.info("PERF %s: done total=%.2fs outputs=%s", pdf_path.name, time.perf_counter() - perf_t0, sorted(outputs.keys()))
|
||||
return outputs
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user