Files
anonymisation/run_batch_silver_export.py
Domi31tls d957e72aff feat: vérification ressources GPU/RAM avant exécution + évaluateur 100/100
- Nouveau module scripts/check_resources.py : état GPU/VRAM/RAM/CPU,
  require_resources() et wait_for_resources() avec polling
- Intégré dans finetune_camembert_bio.py (8 Go VRAM + 8 Go RAM)
- Intégré dans run_batch_silver_export.py (workers × 4 Go RAM)
- Évaluateur : EVA et RAI ajoutés aux termes médicaux (score 100.0/100)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-16 10:27:33 +01:00

266 lines
9.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Batch anonymisation parallèle de PDFs pour enrichir le dataset silver.
Traite TOUS les PDFs disponibles en mode CPU (sans VLM), avec N workers
parallèles. Chaque worker charge ses propres modèles NER.
Reprend automatiquement là où il s'est arrêté (skip les déjà traités).
Usage:
python run_batch_silver_export.py # 6 workers (défaut)
python run_batch_silver_export.py --workers 4 # 4 workers
"""
import sys
import os
import time
import argparse
import multiprocessing as mp
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
SRC = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)")
OUTDIR = SRC / "anonymise_silver_extra"
CONFIG = Path("/home/dom/ai/anonymisation/config/dictionnaires.yml")
# PDFs déjà traités dans l'audit 30 (à exclure)
ALREADY_DONE_AUDIT30 = {
"CONSULTATION ANESTHESISTE 23060661.pdf",
"trackare-05000272-23074376_05000272_23074376.pdf",
"CONSULTATION ANESTHESISTE 23056022.pdf",
"trackare-BA042686-23090597_BA042686_23090597.pdf",
"trackare-23000862-23018396_23000862_23018396.pdf",
"LETTRE DE SORTIE 23087212.pdf",
"CRO 23159905.pdf",
"trackare-99246761-23159905_99246761_23159905.pdf",
"CONSULTATION ANESTHESISTE 23139653.pdf",
"CRO 23160703.pdf",
"trackare-BA192486-23127395_BA192486_23127395.pdf",
"BACTERIO 23232115.pdf",
"CR consultation anesth-290-23025988.pdf",
"trackare-05012965-23060770_05012965_23060770.pdf",
"trackare-BA065989-23102874_BA065989_23102874.pdf",
"trackare-BA127127-23135726_BA127127_23135726.pdf",
"trackare-99252128-23177582_99252128_23177582.pdf",
"trackare-BA171849-23214501_BA171849_23214501.pdf",
"trackare-17015185-23043950_17015185_23043950.pdf",
"CRH 60_23106634.pdf",
"trackare-00260974-23070213_00260974_23070213.pdf",
"trackare-BA067657-23076655_BA067657_23076655.pdf",
"trackare-05012679-23098722_05012679_23098722.pdf",
"trackare-11004431-23124019_11004431_23124019.pdf",
"trackare-07003136-23135847_07003136_23135847.pdf",
"trackare-13013848-23165708_13013848_23165708.pdf",
"trackare-03020576-23175616_03020576_23175616.pdf",
"trackare-BA093659-23074520_BA093659_23074520.pdf",
"trackare-14025311-23034958_14025311_23034958.pdf",
"trackare-BA121804-23016863_BA121804_23016863.pdf",
}
TIMEOUT_PER_FILE = 120 # secondes max par PDF
# Variables globales par worker (initialisées une seule fois)
_worker_ner = None
_worker_gliner = None
_worker_camembert = None
_worker_id = None
def init_worker(worker_id):
"""Initialise les modèles NER dans chaque worker (appelé une seule fois)."""
global _worker_ner, _worker_gliner, _worker_camembert, _worker_id
_worker_id = worker_id
# Limiter les threads ONNX/OpenMP par worker pour éviter la contention
n_threads = max(2, 32 // (mp.cpu_count() // 2)) # répartir équitablement
os.environ["OMP_NUM_THREADS"] = str(n_threads)
os.environ["MKL_NUM_THREADS"] = str(n_threads)
import anonymizer_core_refactored_onnx as core # noqa: F401
from eds_pseudo_manager import EdsPseudoManager
from gliner_manager import GlinerManager
from camembert_ner_manager import CamembertNerManager
_worker_ner = EdsPseudoManager()
_worker_ner.load()
print(f" [W{worker_id}] EDS-Pseudo chargé", flush=True)
_worker_gliner = GlinerManager()
try:
_worker_gliner.load()
print(f" [W{worker_id}] GLiNER chargé", flush=True)
except Exception as e:
print(f" [W{worker_id}] GLiNER indisponible ({e})", flush=True)
_worker_gliner = None
_worker_camembert = CamembertNerManager()
try:
_worker_camembert.load()
print(f" [W{worker_id}] CamemBERT-bio chargé", flush=True)
except Exception as e:
print(f" [W{worker_id}] CamemBERT-bio indisponible ({e})", flush=True)
_worker_camembert = None
print(f" [W{worker_id}] Prêt (threads={n_threads})", flush=True)
def process_one_pdf(args):
"""Traite un seul PDF. Appelé par le pool de workers."""
pdf_path, idx, total = args
import signal
import anonymizer_core_refactored_onnx as core
ogc = pdf_path.parent.name.split("_")[0]
# Timeout via alarm
def _timeout_handler(signum, frame):
raise TimeoutError("Timeout")
signal.signal(signal.SIGALRM, _timeout_handler)
signal.alarm(TIMEOUT_PER_FILE)
try:
core.process_pdf(
pdf_path=pdf_path,
out_dir=OUTDIR,
make_vector_redaction=False,
also_make_raster_burn=False,
config_path=CONFIG,
use_hf=True,
ner_manager=_worker_ner,
ner_thresholds=None,
ogc_label=ogc,
vlm_manager=None,
gliner_manager=_worker_gliner,
camembert_manager=_worker_camembert,
)
signal.alarm(0)
return ("OK", pdf_path.name, idx, total)
except TimeoutError:
signal.alarm(0)
return ("TIMEOUT", pdf_path.name, idx, total)
except Exception as e:
signal.alarm(0)
err = str(e)
if "encrypted" in err.lower() or "password" in err.lower():
return ("SKIP", pdf_path.name, idx, total)
return ("ERROR", pdf_path.name, idx, total, str(e)[:100])
def main():
parser = argparse.ArgumentParser(description="Batch silver export parallèle")
parser.add_argument("--workers", type=int, default=6,
help="Nombre de workers parallèles (défaut: 6)")
args = parser.parse_args()
n_workers = args.workers
# Vérification des ressources (RAM surtout — chaque worker charge ~4 Go de modèles NER)
from scripts.check_resources import require_resources
ram_needed = n_workers * 4
print(f"Vérification des ressources ({n_workers} workers × ~4 Go = ~{ram_needed} Go RAM)...")
try:
status = require_resources(ram_free_gb=ram_needed)
print(f" RAM OK : {status.ram_available_gb:.1f} Go disponible")
if status.gpu_available:
print(f" GPU : {status.gpu_name}, {status.vram_free_mb} Mo VRAM libre")
print()
except RuntimeError as e:
print(f"\n{e}", file=sys.stderr)
sys.exit(1)
# Collecter tous les PDFs disponibles (excluant audit_30)
all_pdfs = []
for ogc_dir in sorted(SRC.iterdir()):
if not ogc_dir.is_dir() or ogc_dir.name.startswith("anonymise"):
continue
for pdf in ogc_dir.glob("*.pdf"):
if pdf.name not in ALREADY_DONE_AUDIT30:
all_pdfs.append(pdf)
all_pdfs.sort(key=lambda p: (p.parent.name, p.name))
# Détecter les fichiers déjà traités (reprise)
OUTDIR.mkdir(exist_ok=True)
already_done = {
p.name.replace(".pseudonymise.txt", ".pdf")
for p in OUTDIR.glob("*.pseudonymise.txt")
}
pdfs_to_do = [p for p in all_pdfs if p.name not in already_done]
print(f"PDFs disponibles: {len(all_pdfs)} (excl. audit_30)")
print(f"Déjà traités: {len(already_done)}")
print(f"Restant: {len(pdfs_to_do)}")
print(f"Workers: {n_workers}")
print(f"RAM par worker: ~4 Go (NER models)")
print(f"RAM totale estimée: ~{n_workers * 4} Go\n")
if not pdfs_to_do:
print("Rien à faire.")
return
# Préparer les arguments : (pdf_path, index, total)
tasks = [(pdf, i, len(pdfs_to_do)) for i, pdf in enumerate(pdfs_to_do, 1)]
print(f"Chargement des modèles dans {n_workers} workers...", flush=True)
# Créer le pool avec initialisation des modèles par worker
# On utilise mp.Pool avec initializer pour charger les modèles une seule fois
# Note: fork + ONNX peut poser problème, on utilise 'spawn'
ctx = mp.get_context("spawn")
ok = ko = skip_encrypted = skip_timeout = 0
t0 = time.time()
# Lancer les workers séquentiellement pour l'init (éviter pic mémoire)
# puis traiter en parallèle
with ctx.Pool(
processes=n_workers,
initializer=init_worker,
initargs=(0,), # worker_id simplifié
) as pool:
for result in pool.imap_unordered(process_one_pdf, tasks, chunksize=1):
status = result[0]
name = result[1]
idx = result[2]
total = result[3]
elapsed = time.time() - t0
done = ok + ko + skip_encrypted + skip_timeout + 1
if status == "OK":
ok += 1
rate = ok / elapsed * 3600 if elapsed > 0 else 0
print(f"[{done}/{total}] {name} OK ({rate:.0f}/h)", flush=True)
elif status == "TIMEOUT":
skip_timeout += 1
print(f"[{done}/{total}] {name} TIMEOUT", flush=True)
elif status == "SKIP":
skip_encrypted += 1
print(f"[{done}/{total}] {name} SKIP (chiffré)", flush=True)
else:
ko += 1
err_msg = result[4] if len(result) > 4 else "?"
print(f"[{done}/{total}] {name} ERREUR: {err_msg}", flush=True)
# Rapport intermédiaire toutes les 50 fichiers
if done % 50 == 0:
remaining = (elapsed / done) * (total - done)
print(f"\n --- Progression: {done}/{total} | OK: {ok} | "
f"Erreurs: {ko} | Timeout: {skip_timeout} | "
f"Débit: {ok/elapsed*3600:.0f}/h | "
f"Restant: {remaining/60:.0f}min ---\n", flush=True)
elapsed = time.time() - t0
total_pseudo = len(list(OUTDIR.glob("*.pseudonymise.txt")))
print(f"\n{'='*60}")
print(f"Terminé en {elapsed:.0f}s ({elapsed/60:.1f}min)")
print(f"OK: {ok}, Chiffrés: {skip_encrypted}, Timeout: {skip_timeout}, Erreurs: {ko}")
print(f"Total .pseudonymise.txt: {total_pseudo}")
print(f"Débit moyen: {ok/elapsed*3600:.0f} fichiers/h")
print(f"Sortie: {OUTDIR}")
if __name__ == "__main__":
main()