fix(perf): apply MVP threading hotfix

Configure numerical library and torch threading for H1, keep raster threading/timing instrumentation, remove CONCERTATION from forced masks after real PDF FP testing, and record coordination archive state.
This commit is contained in:
2026-06-08 10:41:15 +02:00
parent 3249f3a337
commit 21a408a9e4
68 changed files with 2075 additions and 20 deletions

View File

@@ -19,9 +19,20 @@ import os
import re
import shutil
import sys
from concurrent.futures import ProcessPoolExecutor
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from datetime import datetime
# --- H1 perf (D-19) : usage multi-cœur des libs numériques en EXE frozen ---
# En PyInstaller frozen, OpenMP/MKL/BLAS tombent souvent à 1 thread (CPU ~12 %).
# Ces variables sont lues par numpy/torch/onnxruntime à leur init : elles doivent
# donc être posées AVANT l'import de pdfplumber/PIL (numpy transitif) ci-dessous.
# setdefault : on n'écrase jamais un réglage explicite posé par l'utilisateur/admin.
_n_cpu_threads = str(os.cpu_count() or 4)
for _env in ("OMP_NUM_THREADS", "MKL_NUM_THREADS", "OPENBLAS_NUM_THREADS",
"NUMEXPR_NUM_THREADS", "VECLIB_MAXIMUM_THREADS"):
os.environ.setdefault(_env, _n_cpu_threads)
log = logging.getLogger(__name__)
from dataclasses import dataclass, field
from pathlib import Path
@@ -64,6 +75,46 @@ except Exception:
_doctr_ocr_predictor = None # type: ignore
_DOCTR_AVAILABLE = False
_doctr_model_cache = None
_TORCH_THREADS_CONFIGURED = False
def _configure_torch_threads():
"""Configure les threads PyTorch pour exploiter tous les cœurs en mode frozen.
En EXE PyInstaller, torch ne configure pas ses threads par défaut et reste
à 1 thread intra-op + 1 inter-op, ce qui limite l'OCR docTR et le NER
à ~12 % CPU sur une machine 8 threads.
Idempotent : appelable depuis l'OCR (doc scanné) comme depuis le NER (doc
natif sans OCR). `set_num_interop_threads` ne peut être posé qu'une seule
fois avant tout travail parallèle ; le flag évite un 2e appel qui lèverait.
"""
global _TORCH_THREADS_CONFIGURED
if _TORCH_THREADS_CONFIGURED:
return
try:
import torch
n_cpus = os.cpu_count() or 4
torch.set_num_threads(n_cpus)
try:
torch.set_num_interop_threads(min(n_cpus, 8))
except Exception:
pass # inter-op déjà figé par un travail torch antérieur — non bloquant
_TORCH_THREADS_CONFIGURED = True
log.info("torch threads config: intra=%d inter=%d (CPUs=%d)",
n_cpus, min(n_cpus, 8), os.cpu_count() or 0)
except Exception as e:
log.debug("torch threads config skipped: %s", e)
def _get_doctr_model():
global _doctr_model_cache
if _doctr_model_cache is None:
_configure_torch_threads()
_doctr_model_cache = _doctr_ocr_predictor(
det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True
)
return _doctr_model_cache
try:
from detectors.hospital_filter import HospitalFilter
_HOSPITAL_FILTER_AVAILABLE = True
@@ -1093,16 +1144,6 @@ def _apply_admin_identifier_hits(full_raw: str, audit: List["PiiHit"], cfg: Dict
# ----------------- Extraction -----------------
_doctr_model_cache = None
def _get_doctr_model():
global _doctr_model_cache
if _doctr_model_cache is None:
_doctr_model_cache = _doctr_ocr_predictor(
det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True
)
return _doctr_model_cache
def _extract_page_layout_aware(page) -> str:
"""Extrait le texte d'une page PyMuPDF en gérant les layouts multi-colonnes.
@@ -1763,7 +1804,7 @@ def _kv_value_only_mask(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
parts = SPLITTER.split(line, maxsplit=1)
# Une ligne narrative qui se termine par ` ;` ou ` :` produit un split
# avec une "value" vide. La "key" contient alors tout le narratif —
# incluant d'éventuels force_term (`CHUXX`, `CONCERTATION`…) qui doivent
# incluant d'éventuels force_term (`CHUXX`, sigle local...) qui doivent
# être masqués. Idem si la "key" fait plus de 5 mots : c'est très
# probablement du narratif, pas un libellé `Label : valeur`.
if len(parts) == 2 and parts[1].strip() and len(parts[0].split()) <= 5:
@@ -2977,6 +3018,11 @@ def _run_ner_on_original_text(
Returns:
Liste de NerDetection dédupliquée (par token+label+page+source).
"""
# H1 perf (D-19) : couvre le cas du PDF natif (texte riche, OCR sauté) où
# _get_doctr_model() n'est jamais appelé ; les NER torch (EDS-Pseudo, GLiNER)
# tourneraient alors mono-thread. Idempotent (no-op si déjà configuré par l'OCR).
_configure_torch_threads()
detections: List[NerDetection] = []
seen: set = set() # (token_lower, label, page_idx, source) pour dédoublonnage
@@ -4274,7 +4320,9 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp
rects.extend(found)
all_rects[pno] = rects
# Phase 2 : rasterisation parallèle (ProcessPoolExecutor)
# Phase 2 : rasterisation parallèle (ProcessPoolExecutor hors EXE,
# ThreadPoolExecutor en EXE PyInstaller pour éviter de relancer la GUI).
raster_t0 = time.perf_counter()
n_pages = len(doc)
rects_as_tuples = {
pno: [(r.x0, r.y0, r.x1, r.y1) for r in rects]
@@ -4313,12 +4361,26 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp
for pno in range(n_pages)
]
# Mode frozen (PyInstaller --onefile) : ProcessPoolExecutor relance l'exe
# et ouvre des fenêtres GUI fantômes → séquentiel obligatoire
if getattr(sys, 'frozen', False) or n_pages <= 2:
frozen = bool(getattr(sys, 'frozen', False))
disable_threads = os.getenv("ANON_DISABLE_RASTER_THREADS", "").lower() in {"1", "true", "yes", "on"}
if n_pages <= 2:
log.info("Raster PDF: mode=sequential pages=%d dpi=%d reason=small_pdf", n_pages, dpi)
results = sorted([_rasterize_page(t) for t in tasks], key=lambda x: x[0])
elif frozen and not disable_threads:
n_workers = min(n_pages, os.cpu_count() or 4)
log.info("Raster PDF: mode=threads pages=%d workers=%d dpi=%d frozen=1", n_pages, n_workers, dpi)
try:
with ThreadPoolExecutor(max_workers=n_workers) as pool:
results = sorted(pool.map(_rasterize_page, tasks), key=lambda x: x[0])
except Exception as e:
log.warning("Raster PDF threaded mode failed, fallback sequential: %s", e)
results = sorted([_rasterize_page(t) for t in tasks], key=lambda x: x[0])
elif frozen and disable_threads:
log.info("Raster PDF: mode=sequential pages=%d dpi=%d frozen=1 reason=env_disabled", n_pages, dpi)
results = sorted([_rasterize_page(t) for t in tasks], key=lambda x: x[0])
else:
n_workers = min(n_pages, os.cpu_count() or 4)
log.info("Raster PDF: mode=processes pages=%d workers=%d dpi=%d frozen=0", n_pages, n_workers, dpi)
with ProcessPoolExecutor(max_workers=n_workers) as pool:
results = sorted(pool.map(_rasterize_page, tasks), key=lambda x: x[0])
@@ -4331,6 +4393,7 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp
_apply_pseudo_xmp_metadata(out)
out.save(str(out_pdf), deflate=True, garbage=4, clean=True)
out.close()
log.info("Raster PDF done: pages=%d output=%s duration=%.2fs", n_pages, out_pdf.name, time.perf_counter() - raster_t0)
# ----------------- VLM pour PDFs scannés -----------------
@@ -4424,15 +4487,31 @@ def process_pdf(
camembert_manager=None,
quarantine_mgr: Optional["QuarantineManager"] = None,
) -> Dict[str, str]:
perf_t0 = time.perf_counter()
last_mark = perf_t0
def _perf_mark(stage: str) -> None:
nonlocal last_mark
now = time.perf_counter()
log.info("PERF %s: stage=%s duration=%.2fs total=%.2fs",
pdf_path.name, stage, now - last_mark, now - perf_t0)
last_mark = now
log.info("PERF %s: start frozen=%s vector=%s raster=%s",
pdf_path.name, bool(getattr(sys, "frozen", False)), make_vector_redaction, also_make_raster_burn)
out_dir.mkdir(parents=True, exist_ok=True)
cfg = load_dictionaries(config_path)
_perf_mark("load_config")
pages_text, tables_lines, ocr_used, ocr_word_map = extract_text_with_fallback_ocr(pdf_path)
_perf_mark("extract_text_ocr")
# Q-1 B-3 : pré-flight texte vide. Si moins de SEUIL_TEXTE_MINI caractères
# extraits, c'est probablement un scan non-OCRisé ou un document corrompu.
# On NE traite PAS — quarantaine totale, le doc original est copié pour
# ré-essai manuel.
extracted_chars = sum(len(p) for p in pages_text)
log.info("PERF %s: pages=%d extracted_chars=%d ocr_used=%s ocr_pages=%d",
pdf_path.name, len(pages_text), extracted_chars, bool(ocr_used), len(ocr_word_map or {}))
if extracted_chars < SEUIL_TEXTE_MINI:
log.warning("Preflight failed for %s: only %d chars extracted (seuil=%d)",
pdf_path.name, extracted_chars, SEUIL_TEXTE_MINI)
@@ -4479,12 +4558,14 @@ def process_pdf(
gliner_mgr=gliner_manager,
camembert_mgr=camembert_manager,
)
_perf_mark("regex_rules")
# 1b) VLM (optionnel) — sur les PDFs scannés uniquement
if ocr_used and vlm_manager is not None and VlmManager is not None:
try:
if vlm_manager.is_loaded():
_apply_vlm_on_scanned_pdf(pdf_path, anon, ocr_word_map, vlm_manager)
_perf_mark("vlm_scan")
except Exception:
pass # dégradation gracieuse
@@ -4498,9 +4579,11 @@ def process_pdf(
else:
final_text, hf_hits = apply_hf_ner_on_narrative(final_text, cfg, ner_manager, ner_thresholds)
anon.audit.extend(hf_hits)
_perf_mark("ner_optional")
# 3) Rescan selectif
final_text = selective_rescan(final_text, cfg=cfg)
_perf_mark("selective_rescan")
# 3a-bis) Nettoyage post-masquage : continuation orpheline d'un nom composé
# coupé par saut de ligne. Cas Trackare en colonnes : "NOCENT-EJNAINI"
@@ -4820,6 +4903,7 @@ def process_pdf(
r"DOSSIER|NDA|EPISODE|RPPS|DATE_NAISSANCE|AGE|NIR|IBAN|OGC)\])\]+"
)
final_text = _RE_BRACKET_CLEAN.sub(r"\1", final_text)
_perf_mark("post_cleaning")
# 6) Whitelist absolue : filtrer les hits qui matchent un terme whitelist
# de la GUI (clé YAML whitelist_phrases). Filet de sécurité après tous les
@@ -4960,6 +5044,7 @@ def process_pdf(
for hit in audit_for_file:
f.write(json.dumps(hit.__dict__, ensure_ascii=False) + "\n")
outputs = {"text": str(txt_path), "audit": str(audit_path)}
_perf_mark("write_text_audit")
# PDFs
if make_vector_redaction and fitz is not None:
@@ -4967,6 +5052,7 @@ def process_pdf(
try:
redact_pdf_vector(pdf_path, anon.audit, vec_path, ocr_word_map=ocr_word_map)
outputs["pdf_vector"] = str(vec_path)
_perf_mark("pdf_vector")
except Exception as e:
# Q-1 D2/D3 : ne plus avaler silencieusement. Le texte (.pseudonymise.txt)
# est déjà sorti avant ce bloc.
@@ -5023,6 +5109,8 @@ def process_pdf(
ras_path = out_dir / f"{base}.redacted_raster.pdf"
redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label, ocr_word_map=ocr_word_map)
outputs["pdf_raster"] = str(ras_path)
_perf_mark("pdf_raster")
log.info("PERF %s: done total=%.2fs outputs=%s", pdf_path.name, time.perf_counter() - perf_t0, sorted(outputs.keys()))
return outputs