- Nouveau module scripts/check_resources.py : état GPU/VRAM/RAM/CPU, require_resources() et wait_for_resources() avec polling - Intégré dans finetune_camembert_bio.py (8 Go VRAM + 8 Go RAM) - Intégré dans run_batch_silver_export.py (workers × 4 Go RAM) - Évaluateur : EVA et RAI ajoutés aux termes médicaux (score 100.0/100) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
266 lines
9.8 KiB
Python
266 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
||
"""Batch anonymisation parallèle de PDFs pour enrichir le dataset silver.
|
||
|
||
Traite TOUS les PDFs disponibles en mode CPU (sans VLM), avec N workers
|
||
parallèles. Chaque worker charge ses propres modèles NER.
|
||
|
||
Reprend automatiquement là où il s'est arrêté (skip les déjà traités).
|
||
|
||
Usage:
|
||
python run_batch_silver_export.py # 6 workers (défaut)
|
||
python run_batch_silver_export.py --workers 4 # 4 workers
|
||
"""
|
||
import sys
|
||
import os
|
||
import time
|
||
import argparse
|
||
import multiprocessing as mp
|
||
from pathlib import Path
|
||
|
||
sys.path.insert(0, str(Path(__file__).parent))
|
||
|
||
SRC = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)")
|
||
OUTDIR = SRC / "anonymise_silver_extra"
|
||
CONFIG = Path("/home/dom/ai/anonymisation/config/dictionnaires.yml")
|
||
|
||
# PDFs déjà traités dans l'audit 30 (à exclure)
|
||
ALREADY_DONE_AUDIT30 = {
|
||
"CONSULTATION ANESTHESISTE 23060661.pdf",
|
||
"trackare-05000272-23074376_05000272_23074376.pdf",
|
||
"CONSULTATION ANESTHESISTE 23056022.pdf",
|
||
"trackare-BA042686-23090597_BA042686_23090597.pdf",
|
||
"trackare-23000862-23018396_23000862_23018396.pdf",
|
||
"LETTRE DE SORTIE 23087212.pdf",
|
||
"CRO 23159905.pdf",
|
||
"trackare-99246761-23159905_99246761_23159905.pdf",
|
||
"CONSULTATION ANESTHESISTE 23139653.pdf",
|
||
"CRO 23160703.pdf",
|
||
"trackare-BA192486-23127395_BA192486_23127395.pdf",
|
||
"BACTERIO 23232115.pdf",
|
||
"CR consultation anesth-290-23025988.pdf",
|
||
"trackare-05012965-23060770_05012965_23060770.pdf",
|
||
"trackare-BA065989-23102874_BA065989_23102874.pdf",
|
||
"trackare-BA127127-23135726_BA127127_23135726.pdf",
|
||
"trackare-99252128-23177582_99252128_23177582.pdf",
|
||
"trackare-BA171849-23214501_BA171849_23214501.pdf",
|
||
"trackare-17015185-23043950_17015185_23043950.pdf",
|
||
"CRH 60_23106634.pdf",
|
||
"trackare-00260974-23070213_00260974_23070213.pdf",
|
||
"trackare-BA067657-23076655_BA067657_23076655.pdf",
|
||
"trackare-05012679-23098722_05012679_23098722.pdf",
|
||
"trackare-11004431-23124019_11004431_23124019.pdf",
|
||
"trackare-07003136-23135847_07003136_23135847.pdf",
|
||
"trackare-13013848-23165708_13013848_23165708.pdf",
|
||
"trackare-03020576-23175616_03020576_23175616.pdf",
|
||
"trackare-BA093659-23074520_BA093659_23074520.pdf",
|
||
"trackare-14025311-23034958_14025311_23034958.pdf",
|
||
"trackare-BA121804-23016863_BA121804_23016863.pdf",
|
||
}
|
||
|
||
TIMEOUT_PER_FILE = 120 # secondes max par PDF
|
||
|
||
# Variables globales par worker (initialisées une seule fois)
|
||
_worker_ner = None
|
||
_worker_gliner = None
|
||
_worker_camembert = None
|
||
_worker_id = None
|
||
|
||
|
||
def init_worker(worker_id):
|
||
"""Initialise les modèles NER dans chaque worker (appelé une seule fois)."""
|
||
global _worker_ner, _worker_gliner, _worker_camembert, _worker_id
|
||
_worker_id = worker_id
|
||
|
||
# Limiter les threads ONNX/OpenMP par worker pour éviter la contention
|
||
n_threads = max(2, 32 // (mp.cpu_count() // 2)) # répartir équitablement
|
||
os.environ["OMP_NUM_THREADS"] = str(n_threads)
|
||
os.environ["MKL_NUM_THREADS"] = str(n_threads)
|
||
|
||
import anonymizer_core_refactored_onnx as core # noqa: F401
|
||
from eds_pseudo_manager import EdsPseudoManager
|
||
from gliner_manager import GlinerManager
|
||
from camembert_ner_manager import CamembertNerManager
|
||
|
||
_worker_ner = EdsPseudoManager()
|
||
_worker_ner.load()
|
||
print(f" [W{worker_id}] EDS-Pseudo chargé", flush=True)
|
||
|
||
_worker_gliner = GlinerManager()
|
||
try:
|
||
_worker_gliner.load()
|
||
print(f" [W{worker_id}] GLiNER chargé", flush=True)
|
||
except Exception as e:
|
||
print(f" [W{worker_id}] GLiNER indisponible ({e})", flush=True)
|
||
_worker_gliner = None
|
||
|
||
_worker_camembert = CamembertNerManager()
|
||
try:
|
||
_worker_camembert.load()
|
||
print(f" [W{worker_id}] CamemBERT-bio chargé", flush=True)
|
||
except Exception as e:
|
||
print(f" [W{worker_id}] CamemBERT-bio indisponible ({e})", flush=True)
|
||
_worker_camembert = None
|
||
|
||
print(f" [W{worker_id}] Prêt (threads={n_threads})", flush=True)
|
||
|
||
|
||
def process_one_pdf(args):
|
||
"""Traite un seul PDF. Appelé par le pool de workers."""
|
||
pdf_path, idx, total = args
|
||
import signal
|
||
import anonymizer_core_refactored_onnx as core
|
||
|
||
ogc = pdf_path.parent.name.split("_")[0]
|
||
|
||
# Timeout via alarm
|
||
def _timeout_handler(signum, frame):
|
||
raise TimeoutError("Timeout")
|
||
|
||
signal.signal(signal.SIGALRM, _timeout_handler)
|
||
signal.alarm(TIMEOUT_PER_FILE)
|
||
|
||
try:
|
||
core.process_pdf(
|
||
pdf_path=pdf_path,
|
||
out_dir=OUTDIR,
|
||
make_vector_redaction=False,
|
||
also_make_raster_burn=False,
|
||
config_path=CONFIG,
|
||
use_hf=True,
|
||
ner_manager=_worker_ner,
|
||
ner_thresholds=None,
|
||
ogc_label=ogc,
|
||
vlm_manager=None,
|
||
gliner_manager=_worker_gliner,
|
||
camembert_manager=_worker_camembert,
|
||
)
|
||
signal.alarm(0)
|
||
return ("OK", pdf_path.name, idx, total)
|
||
except TimeoutError:
|
||
signal.alarm(0)
|
||
return ("TIMEOUT", pdf_path.name, idx, total)
|
||
except Exception as e:
|
||
signal.alarm(0)
|
||
err = str(e)
|
||
if "encrypted" in err.lower() or "password" in err.lower():
|
||
return ("SKIP", pdf_path.name, idx, total)
|
||
return ("ERROR", pdf_path.name, idx, total, str(e)[:100])
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="Batch silver export parallèle")
|
||
parser.add_argument("--workers", type=int, default=6,
|
||
help="Nombre de workers parallèles (défaut: 6)")
|
||
args = parser.parse_args()
|
||
|
||
n_workers = args.workers
|
||
|
||
# Vérification des ressources (RAM surtout — chaque worker charge ~4 Go de modèles NER)
|
||
from scripts.check_resources import require_resources
|
||
ram_needed = n_workers * 4
|
||
print(f"Vérification des ressources ({n_workers} workers × ~4 Go = ~{ram_needed} Go RAM)...")
|
||
try:
|
||
status = require_resources(ram_free_gb=ram_needed)
|
||
print(f" RAM OK : {status.ram_available_gb:.1f} Go disponible")
|
||
if status.gpu_available:
|
||
print(f" GPU : {status.gpu_name}, {status.vram_free_mb} Mo VRAM libre")
|
||
print()
|
||
except RuntimeError as e:
|
||
print(f"\n{e}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
# Collecter tous les PDFs disponibles (excluant audit_30)
|
||
all_pdfs = []
|
||
for ogc_dir in sorted(SRC.iterdir()):
|
||
if not ogc_dir.is_dir() or ogc_dir.name.startswith("anonymise"):
|
||
continue
|
||
for pdf in ogc_dir.glob("*.pdf"):
|
||
if pdf.name not in ALREADY_DONE_AUDIT30:
|
||
all_pdfs.append(pdf)
|
||
|
||
all_pdfs.sort(key=lambda p: (p.parent.name, p.name))
|
||
|
||
# Détecter les fichiers déjà traités (reprise)
|
||
OUTDIR.mkdir(exist_ok=True)
|
||
already_done = {
|
||
p.name.replace(".pseudonymise.txt", ".pdf")
|
||
for p in OUTDIR.glob("*.pseudonymise.txt")
|
||
}
|
||
pdfs_to_do = [p for p in all_pdfs if p.name not in already_done]
|
||
|
||
print(f"PDFs disponibles: {len(all_pdfs)} (excl. audit_30)")
|
||
print(f"Déjà traités: {len(already_done)}")
|
||
print(f"Restant: {len(pdfs_to_do)}")
|
||
print(f"Workers: {n_workers}")
|
||
print(f"RAM par worker: ~4 Go (NER models)")
|
||
print(f"RAM totale estimée: ~{n_workers * 4} Go\n")
|
||
|
||
if not pdfs_to_do:
|
||
print("Rien à faire.")
|
||
return
|
||
|
||
# Préparer les arguments : (pdf_path, index, total)
|
||
tasks = [(pdf, i, len(pdfs_to_do)) for i, pdf in enumerate(pdfs_to_do, 1)]
|
||
|
||
print(f"Chargement des modèles dans {n_workers} workers...", flush=True)
|
||
|
||
# Créer le pool avec initialisation des modèles par worker
|
||
# On utilise mp.Pool avec initializer pour charger les modèles une seule fois
|
||
# Note: fork + ONNX peut poser problème, on utilise 'spawn'
|
||
ctx = mp.get_context("spawn")
|
||
|
||
ok = ko = skip_encrypted = skip_timeout = 0
|
||
t0 = time.time()
|
||
|
||
# Lancer les workers séquentiellement pour l'init (éviter pic mémoire)
|
||
# puis traiter en parallèle
|
||
with ctx.Pool(
|
||
processes=n_workers,
|
||
initializer=init_worker,
|
||
initargs=(0,), # worker_id simplifié
|
||
) as pool:
|
||
for result in pool.imap_unordered(process_one_pdf, tasks, chunksize=1):
|
||
status = result[0]
|
||
name = result[1]
|
||
idx = result[2]
|
||
total = result[3]
|
||
|
||
elapsed = time.time() - t0
|
||
done = ok + ko + skip_encrypted + skip_timeout + 1
|
||
|
||
if status == "OK":
|
||
ok += 1
|
||
rate = ok / elapsed * 3600 if elapsed > 0 else 0
|
||
print(f"[{done}/{total}] {name} OK ({rate:.0f}/h)", flush=True)
|
||
elif status == "TIMEOUT":
|
||
skip_timeout += 1
|
||
print(f"[{done}/{total}] {name} TIMEOUT", flush=True)
|
||
elif status == "SKIP":
|
||
skip_encrypted += 1
|
||
print(f"[{done}/{total}] {name} SKIP (chiffré)", flush=True)
|
||
else:
|
||
ko += 1
|
||
err_msg = result[4] if len(result) > 4 else "?"
|
||
print(f"[{done}/{total}] {name} ERREUR: {err_msg}", flush=True)
|
||
|
||
# Rapport intermédiaire toutes les 50 fichiers
|
||
if done % 50 == 0:
|
||
remaining = (elapsed / done) * (total - done)
|
||
print(f"\n --- Progression: {done}/{total} | OK: {ok} | "
|
||
f"Erreurs: {ko} | Timeout: {skip_timeout} | "
|
||
f"Débit: {ok/elapsed*3600:.0f}/h | "
|
||
f"Restant: {remaining/60:.0f}min ---\n", flush=True)
|
||
|
||
elapsed = time.time() - t0
|
||
total_pseudo = len(list(OUTDIR.glob("*.pseudonymise.txt")))
|
||
print(f"\n{'='*60}")
|
||
print(f"Terminé en {elapsed:.0f}s ({elapsed/60:.1f}min)")
|
||
print(f"OK: {ok}, Chiffrés: {skip_encrypted}, Timeout: {skip_timeout}, Erreurs: {ko}")
|
||
print(f"Total .pseudonymise.txt: {total_pseudo}")
|
||
print(f"Débit moyen: {ok/elapsed*3600:.0f} fichiers/h")
|
||
print(f"Sortie: {OUTDIR}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|