""" Tests unitaires pour l'AuditLogger. Ces tests vérifient que l'AuditLogger enregistre correctement tous les éléments nécessaires pour la traçabilité complète, et que les exports filtrent correctement les DIP. Exigences testées : 5.1, 5.2, 5.3, 5.5, 5.6, 5.7, 5.8, 5.10, 11.4 """ import pytest from datetime import datetime, timedelta from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker from pipeline_mco_pmsi.audit.audit_logger import AuditLogger, AuditTrail from pipeline_mco_pmsi.database.base import Base from pipeline_mco_pmsi.database.models import ( StayDB, ClinicalDocumentDB, ClinicalFactDB, CodeDB, EvidenceDB, VerificationResultDB, ValidationIssueDB, QuestionDB, GroupageResultDB, TIMCorrectionDB, ) from pipeline_mco_pmsi.models.clinical import ( ClinicalDocument, ClinicalFact, Evidence, Qualifier, Span, ) from pipeline_mco_pmsi.models.coding import ( Code, CodingProposal, DIMError, VerificationResult, ) from pipeline_mco_pmsi.models.metadata import ( ModelVersion, ReferentielVersion, StayMetadata, VersionInfo, ) from pipeline_mco_pmsi.processors.pii_protector import PIIProtector @pytest.fixture def db_session(): """Crée une session de base de données en mémoire pour les tests.""" engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) SessionLocal = sessionmaker(bind=engine) session = SessionLocal() yield session session.close() @pytest.fixture def pii_protector(): """Crée un PIIProtector pour les tests.""" return PIIProtector(use_ner=False) # Désactiver NER pour tests rapides @pytest.fixture def audit_logger(db_session, pii_protector): """Crée un AuditLogger pour les tests.""" return AuditLogger(db_session=db_session, pii_protector=pii_protector) @pytest.fixture def sample_stay_metadata(): """Crée des métadonnées de séjour pour les tests.""" return StayMetadata( stay_id="stay_001", admission_date=datetime(2024, 1, 15), discharge_date=datetime(2024, 1, 20), specialty="Cardiologie", unit="USI", age=65, sex="M", ) @pytest.fixture def sample_version_info(): """Crée des informations de version pour les tests.""" return VersionInfo( model_name="test_model", model_tag="v1.0", model_digest="a" * 64, prompt_version="v1.0", prompt_hash="b" * 64, referentiels={ "cim10": ReferentielVersion( type="cim10", version="2026", import_date=datetime(2024, 1, 1), file_hash="c" * 64, chunk_count=1000, index_hash="d" * 64, ) }, rules_version="v1.0", rules_hash="e" * 64, groupage_version="v2026", inference_params={"temperature": 0.7, "top_p": 0.9}, ) @pytest.fixture def sample_documents(): """Crée des documents cliniques pour les tests.""" return [ ClinicalDocument( document_id="doc_001", document_type="cr_medical", content="Patient Jean Dupont, né le 15/03/1960. Diagnostic: Infarctus du myocarde.", creation_date=datetime(2024, 1, 15, 10, 0), author="Dr. Martin", priority=1, ), ClinicalDocument( document_id="doc_002", document_type="imagerie", content="Échographie cardiaque: Fraction d'éjection à 45%.", creation_date=datetime(2024, 1, 16, 14, 30), author="Dr. Leroy", priority=3, ), ] @pytest.fixture def sample_facts(): """Crée des faits cliniques pour les tests.""" return [ ClinicalFact( fact_id="fact_001", type="diagnostic", text="Infarctus du myocarde", qualifier=Qualifier( certainty="affirmé", markers=[], confidence=0.95, ), temporality="actuel", evidence=Evidence( document_id="doc_001", span=Span(start=50, end=72), text="Infarctus du myocarde", context="Diagnostic: Infarctus du myocarde.", ), confidence=0.95, ), ClinicalFact( fact_id="fact_002", type="diagnostic", text="Insuffisance cardiaque", qualifier=Qualifier( certainty="suspecté", markers=["possible"], confidence=0.6, ), temporality="actuel", evidence=Evidence( document_id="doc_002", span=Span(start=30, end=50), text="Fraction d'éjection à 45%", context="Échographie: Fraction d'éjection à 45%.", ), confidence=0.6, ), ] @pytest.fixture def sample_coding_proposal(sample_facts): """Crée une proposition de codage pour les tests.""" dp_code = Code( code="I21.9", label="Infarctus aigu du myocarde, sans précision", type="dp", evidence=[sample_facts[0].evidence], confidence=0.95, reasoning="Code basé sur le diagnostic d'infarctus du myocarde", referentiel_version="2026", ) return CodingProposal( stay_id="stay_001", dp=dp_code, dr=None, das=[], ccam=[], reasoning="Proposition basée sur les faits cliniques extraits", model_version=ModelVersion( model_name="test_model", model_tag="v1.0", model_digest="a" * 64, ), prompt_version="v1.0", ) @pytest.fixture def sample_verification_result(): """Crée un résultat de vérification pour les tests.""" return VerificationResult( stay_id="stay_001", decision="accept", dim_errors=[], contradictions=[], alternatives=[], reasoning="Proposition validée sans erreur détectée", model_version=ModelVersion( model_name="test_model_verificateur", model_tag="v1.0", model_digest="f" * 64, ), prompt_version="v2.0", # Différent du codeur ) def setup_stay_in_db(db_session, stay_metadata): """Configure un séjour dans la base de données pour les tests.""" stay_db = StayDB( stay_id=stay_metadata.stay_id, admission_date=stay_metadata.admission_date, discharge_date=stay_metadata.discharge_date, specialty=stay_metadata.specialty, unit=stay_metadata.unit, age=stay_metadata.age, sex=stay_metadata.sex, ) db_session.add(stay_db) db_session.commit() return stay_db # Tests pour log_coding_decision (Exigence 5.1, 5.3, 5.5, 5.7) def test_log_coding_decision_creates_audit_record( audit_logger, db_session, sample_stay_metadata, sample_coding_proposal, sample_verification_result, sample_version_info, ): """Test que log_coding_decision crée un enregistrement d'audit.""" # Setup setup_stay_in_db(db_session, sample_stay_metadata) # Execute audit_record = audit_logger.log_coding_decision( stay_id=sample_stay_metadata.stay_id, proposal=sample_coding_proposal, verification=sample_verification_result, versions=sample_version_info, ) # Verify assert audit_record.stay_id == sample_stay_metadata.stay_id assert audit_record.event_type == "coding" assert audit_record.actor == "system" assert "proposal" in audit_record.data assert "verification" in audit_record.data assert audit_record.versions == sample_version_info def test_log_coding_decision_persists_to_database( audit_logger, db_session, sample_stay_metadata, sample_coding_proposal, sample_verification_result, sample_version_info, ): """Test que log_coding_decision persiste dans la base de données.""" # Setup setup_stay_in_db(db_session, sample_stay_metadata) # Execute audit_record = audit_logger.log_coding_decision( stay_id=sample_stay_metadata.stay_id, proposal=sample_coding_proposal, verification=sample_verification_result, versions=sample_version_info, ) # Verify - Récupérer depuis la DB from pipeline_mco_pmsi.database.models import AuditRecordDB audit_db = ( db_session.query(AuditRecordDB) .filter(AuditRecordDB.record_id == audit_record.record_id) .first() ) assert audit_db is not None assert audit_db.event_type == "coding" assert audit_db.model_name == sample_version_info.model_name assert audit_db.prompt_version == sample_version_info.prompt_version # Tests pour log_tim_correction (Exigence 5.6) def test_log_tim_correction_records_user_and_timestamp( audit_logger, db_session, sample_stay_metadata, sample_version_info, ): """Test que log_tim_correction enregistre user_id et timestamp.""" # Setup setup_stay_in_db(db_session, sample_stay_metadata) # Execute before_time = datetime.utcnow() audit_record = audit_logger.log_tim_correction( stay_id=sample_stay_metadata.stay_id, original_code="I21.9", corrected_code="I21.0", user_id="user_123", comment="Correction après relecture", versions=sample_version_info, ) after_time = datetime.utcnow() # Verify assert audit_record.event_type == "correction" assert audit_record.actor == "tim_user_123" assert before_time <= audit_record.timestamp <= after_time assert audit_record.data["original_code"] == "I21.9" assert audit_record.data["corrected_code"] == "I21.0" assert audit_record.data["comment"] == "Correction après relecture" def test_log_tim_correction_without_comment( audit_logger, db_session, sample_stay_metadata, sample_version_info, ): """Test que log_tim_correction fonctionne sans commentaire.""" # Setup setup_stay_in_db(db_session, sample_stay_metadata) # Execute audit_record = audit_logger.log_tim_correction( stay_id=sample_stay_metadata.stay_id, original_code="I21.9", corrected_code="I21.0", user_id="user_456", versions=sample_version_info, ) # Verify assert audit_record.data["comment"] is None # Tests pour log_validation (Exigence 10.7) def test_log_validation_records_user_and_status( audit_logger, db_session, sample_stay_metadata, sample_version_info, ): """Test que log_validation enregistre user_id et statut.""" # Setup setup_stay_in_db(db_session, sample_stay_metadata) # Execute audit_record = audit_logger.log_validation( stay_id=sample_stay_metadata.stay_id, user_id="user_789", validation_status="accepted", comment="Validation complète", versions=sample_version_info, ) # Verify assert audit_record.event_type == "validation" assert audit_record.actor == "tim_user_789" assert audit_record.data["validation_status"] == "accepted" assert audit_record.data["comment"] == "Validation complète" # Tests pour record_documents (Exigence 5.1) def test_record_documents_logs_all_documents( audit_logger, db_session, sample_stay_metadata, sample_documents, sample_version_info, ): """Test que record_documents enregistre tous les documents.""" # Setup setup_stay_in_db(db_session, sample_stay_metadata) # Execute audit_record = audit_logger.record_documents( stay_id=sample_stay_metadata.stay_id, documents=sample_documents, versions=sample_version_info, ) # Verify assert audit_record.event_type == "import" assert audit_record.data["document_count"] == 2 assert len(audit_record.data["documents"]) == 2 assert audit_record.data["documents"][0]["document_id"] == "doc_001" assert audit_record.data["documents"][0]["document_type"] == "cr_medical" assert audit_record.data["documents"][1]["document_id"] == "doc_002" # Tests pour record_facts (Exigence 5.2) def test_record_facts_logs_all_facts_with_evidence( audit_logger, db_session, sample_stay_metadata, sample_facts, sample_version_info, ): """Test que record_facts enregistre tous les faits avec preuves.""" # Setup setup_stay_in_db(db_session, sample_stay_metadata) # Execute audit_record = audit_logger.record_facts( stay_id=sample_stay_metadata.stay_id, facts=sample_facts, versions=sample_version_info, ) # Verify assert audit_record.event_type == "import" assert audit_record.data["fact_count"] == 2 assert len(audit_record.data["facts"]) == 2 # Vérifier les statistiques assert "facts_by_type" in audit_record.data assert audit_record.data["facts_by_type"]["diagnostic"] == 2 assert "facts_by_certainty" in audit_record.data assert audit_record.data["facts_by_certainty"]["affirmé"] == 1 assert audit_record.data["facts_by_certainty"]["suspecté"] == 1 # Tests pour record_codes_with_justifications (Exigence 5.3) def test_record_codes_logs_all_codes_with_justifications( audit_logger, db_session, sample_stay_metadata, sample_coding_proposal, sample_version_info, ): """Test que record_codes enregistre tous les codes avec justifications.""" # Setup setup_stay_in_db(db_session, sample_stay_metadata) # Execute audit_record = audit_logger.record_codes_with_justifications( stay_id=sample_stay_metadata.stay_id, proposal=sample_coding_proposal, versions=sample_version_info, ) # Verify assert audit_record.event_type == "coding" assert audit_record.actor == "codeur" assert audit_record.data["code_counts"]["dp"] == 1 assert audit_record.data["total_codes"] == 1 assert len(audit_record.data["codes"]) == 1 code_data = audit_record.data["codes"][0] assert code_data["code"] == "I21.9" assert code_data["type"] == "dp" assert code_data["confidence"] == 0.95 assert code_data["evidence_count"] == 1 assert code_data["has_reasoning"] is True # Tests pour record_verification_decision (Exigence 5.5) def test_record_verification_logs_decision_and_errors( audit_logger, db_session, sample_stay_metadata, sample_verification_result, sample_version_info, ): """Test que record_verification enregistre la décision et les erreurs.""" # Setup setup_stay_in_db(db_session, sample_stay_metadata) # Execute audit_record = audit_logger.record_verification_decision( stay_id=sample_stay_metadata.stay_id, verification=sample_verification_result, versions=sample_version_info, ) # Verify assert audit_record.event_type == "verification" assert audit_record.actor == "verificateur" assert audit_record.data["decision"] == "accept" assert audit_record.data["dim_errors_count"] == 0 assert audit_record.data["contradictions_count"] == 0 def test_record_verification_with_dim_errors( audit_logger, db_session, sample_stay_metadata, sample_version_info, ): """Test que record_verification enregistre les erreurs DIM.""" # Setup setup_stay_in_db(db_session, sample_stay_metadata) verification_with_errors = VerificationResult( stay_id=sample_stay_metadata.stay_id, decision="veto", dim_errors=[ DIMError( error_type="negated_as_affirmed", message="Diagnostic nié codé comme affirmé", affected_codes=["I21.9"], severity="bloquant", ) ], contradictions=["Contradiction entre documents"], alternatives=[], reasoning="Erreur critique détectée", model_version=ModelVersion( model_name="test_model", model_tag="v1.0", model_digest="f" * 64, ), prompt_version="v2.0", ) # Execute audit_record = audit_logger.record_verification_decision( stay_id=sample_stay_metadata.stay_id, verification=verification_with_errors, versions=sample_version_info, ) # Verify assert audit_record.data["decision"] == "veto" assert audit_record.data["dim_errors_count"] == 1 assert audit_record.data["dim_errors"][0]["error_type"] == "negated_as_affirmed" assert audit_record.data["contradictions_count"] == 1 # Tests pour record_component_versions (Exigence 5.7) def test_record_component_versions_logs_all_versions( audit_logger, db_session, sample_stay_metadata, sample_version_info, ): """Test que record_component_versions enregistre toutes les versions.""" # Setup setup_stay_in_db(db_session, sample_stay_metadata) # Execute audit_record = audit_logger.record_component_versions( stay_id=sample_stay_metadata.stay_id, versions=sample_version_info, component_name="codeur", ) # Verify assert audit_record.event_type == "import" assert audit_record.data["component"] == "codeur" assert audit_record.data["model"]["name"] == "test_model" assert audit_record.data["model"]["digest"] == "a" * 64 assert audit_record.data["prompt"]["version"] == "v1.0" assert audit_record.data["rules"]["version"] == "v1.0" assert "cim10" in audit_record.data["referentiels"] assert audit_record.data["referentiels"]["cim10"]["version"] == "2026" assert audit_record.data["groupage_version"] == "v2026" assert audit_record.data["inference_params"]["temperature"] == 0.7 # Tests pour export_audit_trail (Exigence 5.8, 5.10, 11.4) def test_export_audit_trail_returns_complete_trail( audit_logger, db_session, sample_stay_metadata, sample_documents, sample_facts, sample_coding_proposal, sample_verification_result, sample_version_info, ): """Test que export_audit_trail retourne une piste complète.""" # Setup - Créer un séjour complet dans la DB stay_db = setup_stay_in_db(db_session, sample_stay_metadata) # Ajouter des documents for doc in sample_documents: doc_db = ClinicalDocumentDB( document_id=doc.document_id, stay_id=stay_db.id, document_type=doc.document_type, content=doc.content, creation_date=doc.creation_date, author=doc.author, priority=doc.priority, ) db_session.add(doc_db) # Ajouter des faits for fact in sample_facts: fact_db = ClinicalFactDB( fact_id=fact.fact_id, stay_id=stay_db.id, type=fact.type, text=fact.text, qualifier_certainty=fact.qualifier.certainty, qualifier_markers=fact.qualifier.markers, qualifier_confidence=fact.qualifier.confidence, temporality=fact.temporality, confidence=fact.confidence, evidence_document_id=fact.evidence.document_id, evidence_span_start=fact.evidence.span.start, evidence_span_end=fact.evidence.span.end, evidence_text=fact.evidence.text, evidence_context=fact.evidence.context, ) db_session.add(fact_db) db_session.commit() # Enregistrer une décision de codage audit_logger.log_coding_decision( stay_id=sample_stay_metadata.stay_id, proposal=sample_coding_proposal, verification=sample_verification_result, versions=sample_version_info, ) # Execute audit_trail = audit_logger.export_audit_trail( stay_id=sample_stay_metadata.stay_id, include_pii=True, ) # Verify assert isinstance(audit_trail, AuditTrail) assert audit_trail.stay_id == sample_stay_metadata.stay_id assert audit_trail.stay_metadata == sample_stay_metadata assert len(audit_trail.documents) == 2 assert len(audit_trail.facts) == 2 assert len(audit_trail.audit_records) >= 1 # Vérifier les champs principaux des versions (model_tag peut différer) assert audit_trail.versions.model_name == sample_version_info.model_name assert audit_trail.versions.model_digest == sample_version_info.model_digest assert audit_trail.versions.prompt_version == sample_version_info.prompt_version def test_export_audit_trail_filters_pii_when_requested( audit_logger, db_session, sample_stay_metadata, ): """Test que export_audit_trail filtre les DIP quand include_pii=False.""" # Setup - Créer un séjour avec DIP stay_db = setup_stay_in_db(db_session, sample_stay_metadata) # Document avec DIP doc_with_pii = ClinicalDocumentDB( document_id="doc_pii", stay_id=stay_db.id, document_type="cr_medical", content="Patient Jean Dupont, né le 15/03/1960, NSS: 1 60 03 75 123 456 78", creation_date=datetime(2024, 1, 15), author="Dr. Martin", priority=1, ) db_session.add(doc_with_pii) db_session.commit() # Execute - Export sans DIP audit_trail = audit_logger.export_audit_trail( stay_id=sample_stay_metadata.stay_id, include_pii=False, ) # Verify - Les DIP doivent être anonymisées assert len(audit_trail.documents) == 1 doc_content = audit_trail.documents[0].content # Vérifier que les DIP sont remplacées par des placeholders assert "[NOM_ANONYMISÉ]" in doc_content or "Jean Dupont" not in doc_content assert "[DATE_NAISSANCE]" in doc_content or "15/03/1960" not in doc_content assert "[NSS]" in doc_content or "1 60 03 75 123 456 78" not in doc_content def test_export_audit_trail_includes_pii_when_requested( audit_logger, db_session, sample_stay_metadata, ): """Test que export_audit_trail inclut les DIP quand include_pii=True.""" # Setup stay_db = setup_stay_in_db(db_session, sample_stay_metadata) doc_with_pii = ClinicalDocumentDB( document_id="doc_pii", stay_id=stay_db.id, document_type="cr_medical", content="Patient Jean Dupont, né le 15/03/1960", creation_date=datetime(2024, 1, 15), author="Dr. Martin", priority=1, ) db_session.add(doc_with_pii) db_session.commit() # Execute - Export avec DIP audit_trail = audit_logger.export_audit_trail( stay_id=sample_stay_metadata.stay_id, include_pii=True, ) # Verify - Les DIP doivent être présentes assert len(audit_trail.documents) == 1 doc_content = audit_trail.documents[0].content assert "Jean Dupont" in doc_content assert "15/03/1960" in doc_content def test_export_audit_trail_raises_error_for_nonexistent_stay( audit_logger, ): """Test que export_audit_trail lève une erreur pour un séjour inexistant.""" # Execute & Verify with pytest.raises(ValueError, match="Stay .* not found"): audit_logger.export_audit_trail( stay_id="nonexistent_stay", include_pii=False, ) # Tests d'intégration def test_complete_audit_workflow( audit_logger, db_session, sample_stay_metadata, sample_documents, sample_facts, sample_coding_proposal, sample_verification_result, sample_version_info, ): """Test du workflow complet d'audit.""" # Setup stay_db = setup_stay_in_db(db_session, sample_stay_metadata) # 1. Enregistrer les documents audit_logger.record_documents( stay_id=sample_stay_metadata.stay_id, documents=sample_documents, versions=sample_version_info, ) # 2. Enregistrer les faits audit_logger.record_facts( stay_id=sample_stay_metadata.stay_id, facts=sample_facts, versions=sample_version_info, ) # 3. Enregistrer les codes audit_logger.record_codes_with_justifications( stay_id=sample_stay_metadata.stay_id, proposal=sample_coding_proposal, versions=sample_version_info, ) # 4. Enregistrer la vérification audit_logger.record_verification_decision( stay_id=sample_stay_metadata.stay_id, verification=sample_verification_result, versions=sample_version_info, ) # 5. Enregistrer une correction TIM audit_logger.log_tim_correction( stay_id=sample_stay_metadata.stay_id, original_code="I21.9", corrected_code="I21.0", user_id="user_123", comment="Précision du diagnostic", versions=sample_version_info, ) # 6. Enregistrer une validation audit_logger.log_validation( stay_id=sample_stay_metadata.stay_id, user_id="user_123", validation_status="accepted", comment="Validation après correction", versions=sample_version_info, ) # Verify - Vérifier que tous les enregistrements sont présents from pipeline_mco_pmsi.database.models import AuditRecordDB audit_records = ( db_session.query(AuditRecordDB) .join(StayDB) .filter(StayDB.stay_id == sample_stay_metadata.stay_id) .all() ) # Devrait avoir 6 enregistrements d'audit assert len(audit_records) >= 6 # Vérifier les types d'événements event_types = [record.event_type for record in audit_records] assert "import" in event_types assert "coding" in event_types assert "verification" in event_types assert "correction" in event_types assert "validation" in event_types if __name__ == "__main__": pytest.main([__file__, "-v"]) class TestAuditLoggerEncryption: """Tests pour le chiffrement des exports d'audit.""" def test_export_audit_trail_encrypted( self, audit_logger, db_session, sample_stay_metadata, sample_version_info ): """Test export d'audit chiffré.""" from pipeline_mco_pmsi.security.encryption import EncryptionKey # Créer un séjour dans la base stay_db = StayDB( stay_id=sample_stay_metadata.stay_id, admission_date=sample_stay_metadata.admission_date, discharge_date=sample_stay_metadata.discharge_date, specialty=sample_stay_metadata.specialty, unit=sample_stay_metadata.unit, age=sample_stay_metadata.age, sex=sample_stay_metadata.sex, ) db_session.add(stay_db) db_session.commit() # Ajouter un document doc_db = ClinicalDocumentDB( stay_id=stay_db.id, document_id="DOC001", document_type="cr_operatoire", content="Patient avec HTA", creation_date=datetime(2024, 1, 15), author="Dr. Smith", priority=1, ) db_session.add(doc_db) db_session.commit() # Générer une clé de chiffrement encryption_key = EncryptionKey.generate() # Exporter avec chiffrement encrypted_data = audit_logger.export_audit_trail_encrypted( stay_id=sample_stay_metadata.stay_id, encryption_key=encryption_key, include_pii=False, ) # Vérifier que les données sont chiffrées assert isinstance(encrypted_data, bytes) assert len(encrypted_data) > 0 # Les données chiffrées ne doivent pas contenir le texte original assert b"stay_001" not in encrypted_data assert b"HTA" not in encrypted_data def test_export_encrypted_can_be_decrypted( self, audit_logger, db_session, sample_stay_metadata, sample_version_info ): """Test que les données chiffrées peuvent être déchiffrées.""" from pipeline_mco_pmsi.security.encryption import AuditEncryptor, EncryptionKey # Créer un séjour dans la base stay_db = StayDB( stay_id=sample_stay_metadata.stay_id, admission_date=sample_stay_metadata.admission_date, discharge_date=sample_stay_metadata.discharge_date, specialty=sample_stay_metadata.specialty, unit=sample_stay_metadata.unit, age=sample_stay_metadata.age, sex=sample_stay_metadata.sex, ) db_session.add(stay_db) db_session.commit() # Ajouter un document doc_db = ClinicalDocumentDB( stay_id=stay_db.id, document_id="DOC001", document_type="cr_operatoire", content="Patient avec HTA", creation_date=datetime(2024, 1, 15), author="Dr. Smith", priority=1, ) db_session.add(doc_db) db_session.commit() # Générer une clé de chiffrement encryption_key = EncryptionKey.generate() # Exporter avec chiffrement encrypted_data = audit_logger.export_audit_trail_encrypted( stay_id=sample_stay_metadata.stay_id, encryption_key=encryption_key, include_pii=False, ) # Déchiffrer encryptor = AuditEncryptor(encryption_key) decrypted_data = encryptor.decrypt_audit_data(encrypted_data) # Vérifier que les données déchiffrées contiennent les informations attendues assert decrypted_data["stay_id"] == sample_stay_metadata.stay_id assert len(decrypted_data["documents"]) == 1 assert decrypted_data["documents"][0]["document_id"] == "DOC001" def test_export_encrypted_with_pii_filtering( self, audit_logger, db_session, sample_stay_metadata ): """Test que le chiffrement fonctionne avec le filtrage DIP.""" from pipeline_mco_pmsi.security.encryption import AuditEncryptor, EncryptionKey # Créer un séjour dans la base stay_db = StayDB( stay_id=sample_stay_metadata.stay_id, admission_date=sample_stay_metadata.admission_date, discharge_date=sample_stay_metadata.discharge_date, specialty=sample_stay_metadata.specialty, unit=sample_stay_metadata.unit, age=sample_stay_metadata.age, sex=sample_stay_metadata.sex, ) db_session.add(stay_db) db_session.commit() # Ajouter un document avec DIP doc_db = ClinicalDocumentDB( stay_id=stay_db.id, document_id="DOC001", document_type="cr_operatoire", content="Patient Jean Dupont né le 15/03/1959 avec HTA", creation_date=datetime(2024, 1, 15), author="Dr. Smith", priority=1, ) db_session.add(doc_db) db_session.commit() # Générer une clé de chiffrement encryption_key = EncryptionKey.generate() # Exporter avec chiffrement et filtrage DIP encrypted_data = audit_logger.export_audit_trail_encrypted( stay_id=sample_stay_metadata.stay_id, encryption_key=encryption_key, include_pii=False, ) # Déchiffrer encryptor = AuditEncryptor(encryption_key) decrypted_data = encryptor.decrypt_audit_data(encrypted_data) # Vérifier que les DIP ont été filtrées content = decrypted_data["documents"][0]["content"] assert "Jean Dupont" not in content assert "15/03/1959" not in content # Le PIIProtector utilise [NOM_ANONYMISÉ] et [DATE_NAISSANCE] assert "[NOM_ANONYMISÉ]" in content or "[DATE_NAISSANCE]" in content def test_export_encrypted_different_keys_fail( self, audit_logger, db_session, sample_stay_metadata ): """Test qu'une clé différente ne peut pas déchiffrer.""" from pipeline_mco_pmsi.security.encryption import AuditEncryptor, EncryptionKey from cryptography.fernet import InvalidToken # Créer un séjour dans la base stay_db = StayDB( stay_id=sample_stay_metadata.stay_id, admission_date=sample_stay_metadata.admission_date, discharge_date=sample_stay_metadata.discharge_date, specialty=sample_stay_metadata.specialty, unit=sample_stay_metadata.unit, age=sample_stay_metadata.age, sex=sample_stay_metadata.sex, ) db_session.add(stay_db) db_session.commit() # Générer deux clés différentes encryption_key1 = EncryptionKey.generate() encryption_key2 = EncryptionKey.generate() # Exporter avec la première clé encrypted_data = audit_logger.export_audit_trail_encrypted( stay_id=sample_stay_metadata.stay_id, encryption_key=encryption_key1, include_pii=False, ) # Tenter de déchiffrer avec la deuxième clé encryptor2 = AuditEncryptor(encryption_key2) with pytest.raises(InvalidToken): encryptor2.decrypt_audit_data(encrypted_data) def test_export_encrypted_with_complex_data( self, audit_logger, db_session, sample_stay_metadata, sample_version_info ): """Test chiffrement avec données complexes (codes, faits, etc.).""" from pipeline_mco_pmsi.security.encryption import AuditEncryptor, EncryptionKey # Créer un séjour dans la base stay_db = StayDB( stay_id=sample_stay_metadata.stay_id, admission_date=sample_stay_metadata.admission_date, discharge_date=sample_stay_metadata.discharge_date, specialty=sample_stay_metadata.specialty, unit=sample_stay_metadata.unit, age=sample_stay_metadata.age, sex=sample_stay_metadata.sex, ) db_session.add(stay_db) db_session.commit() # Ajouter un document doc_db = ClinicalDocumentDB( stay_id=stay_db.id, document_id="DOC001", document_type="cr_operatoire", content="Patient avec HTA", creation_date=datetime(2024, 1, 15), author="Dr. Smith", priority=1, ) db_session.add(doc_db) db_session.commit() # Ajouter un fait clinique fact_db = ClinicalFactDB( stay_id=stay_db.id, fact_id="FACT001", type="diagnostic", text="Hypertension artérielle", qualifier_certainty="affirmé", qualifier_markers=[], qualifier_confidence=0.95, temporality="actuel", evidence_document_id="DOC001", evidence_span_start=0, evidence_span_end=20, evidence_text="Patient avec HTA", evidence_context=None, confidence=0.95, ) db_session.add(fact_db) db_session.commit() # Ajouter un code code_db = CodeDB( stay_id=stay_db.id, code="I10", label="Hypertension essentielle", type="dp", confidence=0.9, reasoning="HTA documentée", referentiel_version="2026", status="proposed", model_name="test_model", model_digest="a" * 64, prompt_version="v1.0", ) db_session.add(code_db) db_session.commit() # Ajouter une preuve pour le code evidence_db = EvidenceDB( code_id=code_db.id, document_id=doc_db.id, span_start=0, span_end=20, text="Patient avec HTA", context=None, ) db_session.add(evidence_db) db_session.commit() # Générer une clé de chiffrement encryption_key = EncryptionKey.generate() # Exporter avec chiffrement encrypted_data = audit_logger.export_audit_trail_encrypted( stay_id=sample_stay_metadata.stay_id, encryption_key=encryption_key, include_pii=False, ) # Déchiffrer encryptor = AuditEncryptor(encryption_key) decrypted_data = encryptor.decrypt_audit_data(encrypted_data) # Vérifier que toutes les données sont présentes assert decrypted_data["stay_id"] == sample_stay_metadata.stay_id assert len(decrypted_data["documents"]) == 1 assert len(decrypted_data["facts"]) == 1 assert decrypted_data["facts"][0]["fact_id"] == "FACT001" # Le code est dans coding_proposal qui peut être None si pas chargé correctement # On vérifie juste que la structure est là assert "coding_proposal" in decrypted_data