#!/usr/bin/env python """ Script pour traiter un séjour avec ses documents cliniques. Usage: python scripts/process_stay.py --stay-id STAY001 --documents doc1.txt doc2.txt python scripts/process_stay.py --stay-id STAY001 --documents-dir /path/to/docs/ """ import argparse import sys from datetime import datetime from pathlib import Path try: import pypdf PDF_SUPPORT = True except ImportError: PDF_SUPPORT = False from pipeline_mco_pmsi.database.base import get_engine, create_all_tables, get_session from pipeline_mco_pmsi.database.models import StayDB, ClinicalDocumentDB from pipeline_mco_pmsi.pipeline import Pipeline from pipeline_mco_pmsi.models.clinical import ClinicalDocument def extract_text_from_pdf(file_path: Path) -> str: """Extrait le texte d'un fichier PDF.""" if not PDF_SUPPORT: raise ImportError("pypdf n'est pas installé. Installez-le avec: pip install pypdf") text_parts = [] try: with open(file_path, 'rb') as f: reader = pypdf.PdfReader(f) # Vérifier si le PDF est chiffré if reader.is_encrypted: # Tenter de déchiffrer avec mot de passe vide try: reader.decrypt('') except: raise RuntimeError(f"Le PDF est protégé par mot de passe: {file_path.name}") for page in reader.pages: text = page.extract_text() if text: text_parts.append(text) full_text = '\n\n'.join(text_parts) # Vérifier que du texte a été extrait if not full_text.strip(): raise RuntimeError(f"Aucun texte extrait du PDF (peut-être un PDF image): {file_path.name}") return full_text except RuntimeError: raise except Exception as e: raise RuntimeError(f"Erreur lors de l'extraction du PDF {file_path.name}: {e}") def load_document(file_path: Path, document_type: str = "cr_operatoire") -> str: """Charge le contenu d'un document (txt ou pdf).""" if file_path.suffix.lower() == '.pdf': return extract_text_from_pdf(file_path) else: with open(file_path, 'r', encoding='utf-8') as f: return f.read() def infer_document_type(filename: str) -> str: """Infère le type de document depuis le nom de fichier.""" filename_lower = filename.lower() if 'cro' in filename_lower or 'operatoire' in filename_lower: return 'cr_operatoire' elif 'crm' in filename_lower or 'medical' in filename_lower: return 'cr_medical' elif 'hospit' in filename_lower: return 'cr_hospitalisation' elif 'consult' in filename_lower: return 'cr_consultation' elif 'urgence' in filename_lower: return 'cr_urgences' elif 'imagerie' in filename_lower or 'radio' in filename_lower: return 'imagerie' elif 'bio' in filename_lower or 'labo' in filename_lower: return 'biologie' elif 'courrier' in filename_lower: return 'courrier' else: return 'autre' def main(): parser = argparse.ArgumentParser( description="Traite un séjour avec ses documents cliniques" ) parser.add_argument( '--stay-id', required=True, help="Identifiant du séjour (ex: STAY001)" ) parser.add_argument( '--documents', nargs='+', help="Liste de fichiers de documents à traiter" ) parser.add_argument( '--documents-dir', type=Path, help="Répertoire contenant les documents à traiter" ) parser.add_argument( '--specialty', default='chirurgie', help="Spécialité médicale (défaut: chirurgie)" ) parser.add_argument( '--admission-date', help="Date d'admission (format: YYYY-MM-DD)" ) parser.add_argument( '--discharge-date', help="Date de sortie (format: YYYY-MM-DD)" ) parser.add_argument( '--db-url', default='sqlite:///pipeline_mco_pmsi.db', help="URL de la base de données (défaut: SQLite local)" ) args = parser.parse_args() # Collecter les fichiers de documents document_files = [] if args.documents: document_files.extend([Path(d) for d in args.documents]) if args.documents_dir: if not args.documents_dir.exists(): print(f"❌ Répertoire introuvable: {args.documents_dir}") sys.exit(1) document_files.extend(args.documents_dir.glob('*.txt')) document_files.extend(args.documents_dir.glob('*.pdf')) if not document_files: print("❌ Aucun document à traiter. Utilisez --documents ou --documents-dir") sys.exit(1) # Vérifier le support PDF si nécessaire has_pdf = any(f.suffix.lower() == '.pdf' for f in document_files) if has_pdf and not PDF_SUPPORT: print("⚠️ Des fichiers PDF ont été détectés mais pypdf n'est pas installé.") print(" Installez-le avec: pip install pypdf") print(" Les fichiers PDF seront ignorés.\n") print(f"📄 {len(document_files)} document(s) à traiter") # Initialiser la base de données print(f"🗄️ Connexion à la base de données: {args.db_url}") engine = get_engine(args.db_url) create_all_tables(engine) # Créer ou récupérer le séjour with get_session(engine) as session: stay = session.query(StayDB).filter(StayDB.stay_id == args.stay_id).first() if not stay: print(f"✨ Création du séjour {args.stay_id}") # Dates par défaut admission_date = datetime.now() if args.admission_date: admission_date = datetime.strptime(args.admission_date, '%Y-%m-%d') discharge_date = datetime.now() if args.discharge_date: discharge_date = datetime.strptime(args.discharge_date, '%Y-%m-%d') stay = StayDB( stay_id=args.stay_id, admission_date=admission_date, discharge_date=discharge_date, specialty=args.specialty, status='processing' ) session.add(stay) session.flush() else: print(f"📋 Séjour {args.stay_id} existant trouvé") # Charger les documents documents = [] skipped_files = [] # Récupérer les document_ids existants pour éviter les doublons existing_doc_ids = {doc.document_id for doc in session.query(ClinicalDocumentDB).filter( ClinicalDocumentDB.stay_id == stay.id ).all()} doc_counter = len(existing_doc_ids) + 1 for doc_file in document_files: print(f"📖 Chargement: {doc_file.name}") # Ignorer les fichiers .oxps (format Microsoft non supporté) if doc_file.suffix.lower() == '.oxps': print(f"⚠️ Format .oxps non supporté. Ignoré: {doc_file.name}") skipped_files.append(doc_file.name) continue try: content = load_document(doc_file) # Vérifier que le contenu n'est pas vide if not content.strip(): print(f"⚠️ Document vide. Ignoré: {doc_file.name}") skipped_files.append(doc_file.name) continue doc_type = infer_document_type(doc_file.name) doc_id = f"{args.stay_id}_DOC{doc_counter:03d}" # Vérifier si le document existe déjà if doc_id in existing_doc_ids: print(f"⚠️ Document déjà existant. Ignoré: {doc_file.name}") skipped_files.append(doc_file.name) continue # Créer le document en base doc_db = ClinicalDocumentDB( stay_id=stay.id, document_id=doc_id, document_type=doc_type, content=content, creation_date=datetime.now(), author="Import automatique", priority=doc_counter ) session.add(doc_db) # Créer le modèle Pydantic pour le pipeline doc = ClinicalDocument( document_id=doc_db.document_id, document_type=doc_type, content=content, creation_date=datetime.now(), author="Import automatique", priority=doc_counter ) documents.append(doc) doc_counter += 1 except Exception as e: print(f"❌ Erreur lors du chargement de {doc_file.name}: {e}") skipped_files.append(doc_file.name) continue session.commit() print(f"✅ {len(documents)} document(s) enregistré(s)") if skipped_files: print(f"⚠️ {len(skipped_files)} fichier(s) ignoré(s): {', '.join(skipped_files)}") # Traiter le séjour avec le pipeline print(f"\n🚀 Traitement du séjour {args.stay_id}...") print("⏳ Cela peut prendre quelques minutes...\n") if not documents: print("❌ Aucun document valide à traiter") sys.exit(1) try: # Créer une session pour le pipeline with get_session(engine) as session: # Créer le RAG engine avec un ReferentielsManager mock from pipeline_mco_pmsi.rag.rag_engine import RAGEngine from pipeline_mco_pmsi.rag.referentiels_manager import ReferentielsManager from pipeline_mco_pmsi.models.metadata import StayMetadata # Créer un ReferentielsManager (mock pour l'instant) referentiels_manager = ReferentielsManager(data_dir=Path("data/referentiels")) rag_engine = RAGEngine(referentiels_manager=referentiels_manager) # Créer le pipeline pipeline = Pipeline( db_session=session, rag_engine=rag_engine ) # Créer les métadonnées du séjour stay_metadata = StayMetadata( stay_id=args.stay_id, admission_date=stay.admission_date, discharge_date=stay.discharge_date, specialty=stay.specialty ) result = pipeline.process_stay( documents=documents, stay_metadata=stay_metadata ) print("\n✅ Traitement terminé !") print(f"\n📊 Résultats:") print(f" - DP: {result.coding_proposal.dp.code if result.coding_proposal.dp else 'Non proposé'}") print(f" - DR: {result.coding_proposal.dr.code if result.coding_proposal.dr else 'Non proposé'}") print(f" - DAS: {len(result.coding_proposal.das)} code(s)") print(f" - CCAM: {len(result.coding_proposal.ccam)} acte(s)") print(f" - Questions: {len(result.questions)}") print(f" - Problèmes de validation: {len(result.validation_issues)}") if result.verification_result: print(f" - Décision vérificateur: {result.verification_result.decision}") print(f"\n🌐 Consultez les résultats sur: http://localhost:8001") print(f" Recherchez le séjour: {args.stay_id}") except Exception as e: print(f"\n❌ Erreur lors du traitement: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()