"""CLI + orchestrateur du pipeline d'anonymisation et extraction CIM-10.""" from __future__ import annotations import argparse import json import logging import sys import time from pathlib import Path from .anonymization.anonymizer import Anonymizer from .config import ANONYMIZED_DIR, REPORTS_DIR, STRUCTURED_DIR, AnonymizationReport, DossierMedical from .extraction.document_classifier import classify from .extraction.crh_parser import parse_crh from .extraction.pdf_extractor import extract_text from .extraction.trackare_parser import parse_trackare from .medical.cim10_extractor import extract_medical_info logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger(__name__) # Flags globaux _use_edsnlp = True _use_rag = True def process_pdf(pdf_path: Path) -> tuple[str, DossierMedical, AnonymizationReport]: """Traite un PDF : extraction → parsing → anonymisation → extraction CIM-10.""" t0 = time.time() logger.info("Traitement de %s", pdf_path.name) # 1. Extraction texte raw_text = extract_text(pdf_path) logger.info(" Texte extrait : %d caractères", len(raw_text)) # 2. Classification doc_type = classify(raw_text) logger.info(" Type de document : %s", doc_type) # 3. Parsing if doc_type == "trackare": parsed = parse_trackare(raw_text) else: parsed = parse_crh(raw_text) # 4. Anonymisation anonymizer = Anonymizer(parsed_data=parsed) anonymized_text = anonymizer.anonymize(raw_text) report = anonymizer.report report.source_file = pdf_path.name logger.info( " Anonymisation : %d remplacements (regex=%d, ner=%d, sweep=%d)", report.total_replacements, report.regex_replacements, report.ner_replacements, report.sweep_replacements, ) # 5. Analyse edsnlp (optionnelle) edsnlp_result = None if _use_edsnlp: edsnlp_result = _run_edsnlp(anonymized_text) # 6. Extraction médicale CIM-10 dossier = extract_medical_info(parsed, anonymized_text, edsnlp_result, use_rag=_use_rag) dossier.source_file = pdf_path.name dossier.document_type = doc_type dossier.processing_time_s = round(time.time() - t0, 2) logger.info(" DP : %s", dossier.diagnostic_principal) logger.info(" DAS : %d, Actes : %d", len(dossier.diagnostics_associes), len(dossier.actes_ccam)) logger.info(" Temps de traitement : %.2fs", dossier.processing_time_s) return anonymized_text, dossier, report def _run_edsnlp(text: str): """Exécute l'analyse edsnlp avec fallback gracieux.""" try: from .medical.edsnlp_pipeline import analyze, is_available if not is_available(): logger.info(" edsnlp non disponible, utilisation du mode regex seul") return None result = analyze(text) logger.info( " edsnlp : %d CIM-10, %d médicaments, %d dates", len(result.cim10_entities), len(result.drug_entities), len(result.date_entities), ) return result except Exception: logger.warning(" edsnlp : erreur lors de l'analyse, fallback regex", exc_info=True) return None def write_outputs( stem: str, anonymized_text: str, dossier: DossierMedical, report: AnonymizationReport, subdir: str | None = None, ) -> None: """Écrit les fichiers de sortie.""" anon_dir = ANONYMIZED_DIR / subdir if subdir else ANONYMIZED_DIR struct_dir = STRUCTURED_DIR / subdir if subdir else STRUCTURED_DIR rep_dir = REPORTS_DIR / subdir if subdir else REPORTS_DIR anon_dir.mkdir(parents=True, exist_ok=True) struct_dir.mkdir(parents=True, exist_ok=True) rep_dir.mkdir(parents=True, exist_ok=True) # Texte anonymisé anon_path = anon_dir / f"{stem}_anonymized.txt" anon_path.write_text(anonymized_text, encoding="utf-8") logger.info(" → %s", anon_path) # JSON structuré json_path = struct_dir / f"{stem}_cim10.json" json_path.write_text( dossier.model_dump_json(indent=2, exclude_none=True), encoding="utf-8", ) logger.info(" → %s", json_path) # Rapport d'anonymisation report_path = rep_dir / f"{stem}_report.json" report_path.write_text( report.model_dump_json(indent=2), encoding="utf-8", ) logger.info(" → %s", report_path) def main(input_path: str | None = None) -> None: """Point d'entrée principal.""" global _use_edsnlp, _use_rag parser = argparse.ArgumentParser( description="Anonymisation de documents médicaux PDF et extraction CIM-10", ) parser.add_argument( "input", nargs="?", default=input_path or "input/", help="Chemin vers un PDF ou un dossier de PDFs (défaut: input/)", ) parser.add_argument( "--no-ner", action="store_true", help="Désactiver la phase NER (plus rapide, moins précis)", ) parser.add_argument( "--no-edsnlp", action="store_true", help="Désactiver l'analyse edsnlp (mode regex seul)", ) parser.add_argument( "--no-rag", action="store_true", help="Désactiver l'enrichissement RAG (FAISS + Ollama)", ) args = parser.parse_args() if args.no_ner: # Monkey-patch pour désactiver NER from .anonymization import ner_anonymizer ner_anonymizer.extract_person_entities = lambda text: [] if args.no_edsnlp: _use_edsnlp = False if args.no_rag: _use_rag = False input_p = Path(args.input) # Collecte des groupes (pdfs, subdir) à traiter groups: list[tuple[list[Path], str | None]] = [] if input_p.is_file(): groups.append(([input_p], None)) elif input_p.is_dir(): # PDFs à la racine root_pdfs = sorted(input_p.glob("*.pdf")) if root_pdfs: groups.append((root_pdfs, None)) # Sous-dossiers directs (un seul niveau) for child in sorted(input_p.iterdir()): if child.is_dir(): sub_pdfs = sorted(child.glob("*.pdf")) if sub_pdfs: groups.append((sub_pdfs, child.name)) else: logger.error("Chemin introuvable : %s", input_p) sys.exit(1) total = sum(len(pdfs) for pdfs, _ in groups) if total == 0: logger.warning("Aucun PDF trouvé dans %s", input_p) sys.exit(0) logger.info("Traitement de %d PDF(s)...", total) for pdfs, subdir in groups: if subdir: logger.info("--- Dossier %s (%d PDFs) ---", subdir, len(pdfs)) for pdf_path in pdfs: try: anonymized_text, dossier, report = process_pdf(pdf_path) stem = pdf_path.stem.replace(" ", "_") write_outputs(stem, anonymized_text, dossier, report, subdir=subdir) except Exception: logger.exception("Erreur lors du traitement de %s", pdf_path.name) logger.info("Terminé.") if __name__ == "__main__": main()