Files
aivanov_CIM/tests/test_audit_logger.py
2026-03-05 01:20:14 +01:00

1145 lines
36 KiB
Python

"""
Tests unitaires pour l'AuditLogger.
Ces tests vérifient que l'AuditLogger enregistre correctement tous les éléments
nécessaires pour la traçabilité complète, et que les exports filtrent correctement
les DIP.
Exigences testées : 5.1, 5.2, 5.3, 5.5, 5.6, 5.7, 5.8, 5.10, 11.4
"""
import pytest
from datetime import datetime, timedelta
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from pipeline_mco_pmsi.audit.audit_logger import AuditLogger, AuditTrail
from pipeline_mco_pmsi.database.base import Base
from pipeline_mco_pmsi.database.models import (
StayDB,
ClinicalDocumentDB,
ClinicalFactDB,
CodeDB,
EvidenceDB,
VerificationResultDB,
ValidationIssueDB,
QuestionDB,
GroupageResultDB,
TIMCorrectionDB,
)
from pipeline_mco_pmsi.models.clinical import (
ClinicalDocument,
ClinicalFact,
Evidence,
Qualifier,
Span,
)
from pipeline_mco_pmsi.models.coding import (
Code,
CodingProposal,
DIMError,
VerificationResult,
)
from pipeline_mco_pmsi.models.metadata import (
ModelVersion,
ReferentielVersion,
StayMetadata,
VersionInfo,
)
from pipeline_mco_pmsi.processors.pii_protector import PIIProtector
@pytest.fixture
def db_session():
"""Crée une session de base de données en mémoire pour les tests."""
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
yield session
session.close()
@pytest.fixture
def pii_protector():
"""Crée un PIIProtector pour les tests."""
return PIIProtector(use_ner=False) # Désactiver NER pour tests rapides
@pytest.fixture
def audit_logger(db_session, pii_protector):
"""Crée un AuditLogger pour les tests."""
return AuditLogger(db_session=db_session, pii_protector=pii_protector)
@pytest.fixture
def sample_stay_metadata():
"""Crée des métadonnées de séjour pour les tests."""
return StayMetadata(
stay_id="stay_001",
admission_date=datetime(2024, 1, 15),
discharge_date=datetime(2024, 1, 20),
specialty="Cardiologie",
unit="USI",
age=65,
sex="M",
)
@pytest.fixture
def sample_version_info():
"""Crée des informations de version pour les tests."""
return VersionInfo(
model_name="test_model",
model_tag="v1.0",
model_digest="a" * 64,
prompt_version="v1.0",
prompt_hash="b" * 64,
referentiels={
"cim10": ReferentielVersion(
type="cim10",
version="2026",
import_date=datetime(2024, 1, 1),
file_hash="c" * 64,
chunk_count=1000,
index_hash="d" * 64,
)
},
rules_version="v1.0",
rules_hash="e" * 64,
groupage_version="v2026",
inference_params={"temperature": 0.7, "top_p": 0.9},
)
@pytest.fixture
def sample_documents():
"""Crée des documents cliniques pour les tests."""
return [
ClinicalDocument(
document_id="doc_001",
document_type="cr_medical",
content="Patient Jean Dupont, né le 15/03/1960. Diagnostic: Infarctus du myocarde.",
creation_date=datetime(2024, 1, 15, 10, 0),
author="Dr. Martin",
priority=1,
),
ClinicalDocument(
document_id="doc_002",
document_type="imagerie",
content="Échographie cardiaque: Fraction d'éjection à 45%.",
creation_date=datetime(2024, 1, 16, 14, 30),
author="Dr. Leroy",
priority=3,
),
]
@pytest.fixture
def sample_facts():
"""Crée des faits cliniques pour les tests."""
return [
ClinicalFact(
fact_id="fact_001",
type="diagnostic",
text="Infarctus du myocarde",
qualifier=Qualifier(
certainty="affirmé",
markers=[],
confidence=0.95,
),
temporality="actuel",
evidence=Evidence(
document_id="doc_001",
span=Span(start=50, end=72),
text="Infarctus du myocarde",
context="Diagnostic: Infarctus du myocarde.",
),
confidence=0.95,
),
ClinicalFact(
fact_id="fact_002",
type="diagnostic",
text="Insuffisance cardiaque",
qualifier=Qualifier(
certainty="suspecté",
markers=["possible"],
confidence=0.6,
),
temporality="actuel",
evidence=Evidence(
document_id="doc_002",
span=Span(start=30, end=50),
text="Fraction d'éjection à 45%",
context="Échographie: Fraction d'éjection à 45%.",
),
confidence=0.6,
),
]
@pytest.fixture
def sample_coding_proposal(sample_facts):
"""Crée une proposition de codage pour les tests."""
dp_code = Code(
code="I21.9",
label="Infarctus aigu du myocarde, sans précision",
type="dp",
evidence=[sample_facts[0].evidence],
confidence=0.95,
reasoning="Code basé sur le diagnostic d'infarctus du myocarde",
referentiel_version="2026",
)
return CodingProposal(
stay_id="stay_001",
dp=dp_code,
dr=None,
das=[],
ccam=[],
reasoning="Proposition basée sur les faits cliniques extraits",
model_version=ModelVersion(
model_name="test_model",
model_tag="v1.0",
model_digest="a" * 64,
),
prompt_version="v1.0",
)
@pytest.fixture
def sample_verification_result():
"""Crée un résultat de vérification pour les tests."""
return VerificationResult(
stay_id="stay_001",
decision="accept",
dim_errors=[],
contradictions=[],
alternatives=[],
reasoning="Proposition validée sans erreur détectée",
model_version=ModelVersion(
model_name="test_model_verificateur",
model_tag="v1.0",
model_digest="f" * 64,
),
prompt_version="v2.0", # Différent du codeur
)
def setup_stay_in_db(db_session, stay_metadata):
"""Configure un séjour dans la base de données pour les tests."""
stay_db = StayDB(
stay_id=stay_metadata.stay_id,
admission_date=stay_metadata.admission_date,
discharge_date=stay_metadata.discharge_date,
specialty=stay_metadata.specialty,
unit=stay_metadata.unit,
age=stay_metadata.age,
sex=stay_metadata.sex,
)
db_session.add(stay_db)
db_session.commit()
return stay_db
# Tests pour log_coding_decision (Exigence 5.1, 5.3, 5.5, 5.7)
def test_log_coding_decision_creates_audit_record(
audit_logger,
db_session,
sample_stay_metadata,
sample_coding_proposal,
sample_verification_result,
sample_version_info,
):
"""Test que log_coding_decision crée un enregistrement d'audit."""
# Setup
setup_stay_in_db(db_session, sample_stay_metadata)
# Execute
audit_record = audit_logger.log_coding_decision(
stay_id=sample_stay_metadata.stay_id,
proposal=sample_coding_proposal,
verification=sample_verification_result,
versions=sample_version_info,
)
# Verify
assert audit_record.stay_id == sample_stay_metadata.stay_id
assert audit_record.event_type == "coding"
assert audit_record.actor == "system"
assert "proposal" in audit_record.data
assert "verification" in audit_record.data
assert audit_record.versions == sample_version_info
def test_log_coding_decision_persists_to_database(
audit_logger,
db_session,
sample_stay_metadata,
sample_coding_proposal,
sample_verification_result,
sample_version_info,
):
"""Test que log_coding_decision persiste dans la base de données."""
# Setup
setup_stay_in_db(db_session, sample_stay_metadata)
# Execute
audit_record = audit_logger.log_coding_decision(
stay_id=sample_stay_metadata.stay_id,
proposal=sample_coding_proposal,
verification=sample_verification_result,
versions=sample_version_info,
)
# Verify - Récupérer depuis la DB
from pipeline_mco_pmsi.database.models import AuditRecordDB
audit_db = (
db_session.query(AuditRecordDB)
.filter(AuditRecordDB.record_id == audit_record.record_id)
.first()
)
assert audit_db is not None
assert audit_db.event_type == "coding"
assert audit_db.model_name == sample_version_info.model_name
assert audit_db.prompt_version == sample_version_info.prompt_version
# Tests pour log_tim_correction (Exigence 5.6)
def test_log_tim_correction_records_user_and_timestamp(
audit_logger,
db_session,
sample_stay_metadata,
sample_version_info,
):
"""Test que log_tim_correction enregistre user_id et timestamp."""
# Setup
setup_stay_in_db(db_session, sample_stay_metadata)
# Execute
before_time = datetime.utcnow()
audit_record = audit_logger.log_tim_correction(
stay_id=sample_stay_metadata.stay_id,
original_code="I21.9",
corrected_code="I21.0",
user_id="user_123",
comment="Correction après relecture",
versions=sample_version_info,
)
after_time = datetime.utcnow()
# Verify
assert audit_record.event_type == "correction"
assert audit_record.actor == "tim_user_123"
assert before_time <= audit_record.timestamp <= after_time
assert audit_record.data["original_code"] == "I21.9"
assert audit_record.data["corrected_code"] == "I21.0"
assert audit_record.data["comment"] == "Correction après relecture"
def test_log_tim_correction_without_comment(
audit_logger,
db_session,
sample_stay_metadata,
sample_version_info,
):
"""Test que log_tim_correction fonctionne sans commentaire."""
# Setup
setup_stay_in_db(db_session, sample_stay_metadata)
# Execute
audit_record = audit_logger.log_tim_correction(
stay_id=sample_stay_metadata.stay_id,
original_code="I21.9",
corrected_code="I21.0",
user_id="user_456",
versions=sample_version_info,
)
# Verify
assert audit_record.data["comment"] is None
# Tests pour log_validation (Exigence 10.7)
def test_log_validation_records_user_and_status(
audit_logger,
db_session,
sample_stay_metadata,
sample_version_info,
):
"""Test que log_validation enregistre user_id et statut."""
# Setup
setup_stay_in_db(db_session, sample_stay_metadata)
# Execute
audit_record = audit_logger.log_validation(
stay_id=sample_stay_metadata.stay_id,
user_id="user_789",
validation_status="accepted",
comment="Validation complète",
versions=sample_version_info,
)
# Verify
assert audit_record.event_type == "validation"
assert audit_record.actor == "tim_user_789"
assert audit_record.data["validation_status"] == "accepted"
assert audit_record.data["comment"] == "Validation complète"
# Tests pour record_documents (Exigence 5.1)
def test_record_documents_logs_all_documents(
audit_logger,
db_session,
sample_stay_metadata,
sample_documents,
sample_version_info,
):
"""Test que record_documents enregistre tous les documents."""
# Setup
setup_stay_in_db(db_session, sample_stay_metadata)
# Execute
audit_record = audit_logger.record_documents(
stay_id=sample_stay_metadata.stay_id,
documents=sample_documents,
versions=sample_version_info,
)
# Verify
assert audit_record.event_type == "import"
assert audit_record.data["document_count"] == 2
assert len(audit_record.data["documents"]) == 2
assert audit_record.data["documents"][0]["document_id"] == "doc_001"
assert audit_record.data["documents"][0]["document_type"] == "cr_medical"
assert audit_record.data["documents"][1]["document_id"] == "doc_002"
# Tests pour record_facts (Exigence 5.2)
def test_record_facts_logs_all_facts_with_evidence(
audit_logger,
db_session,
sample_stay_metadata,
sample_facts,
sample_version_info,
):
"""Test que record_facts enregistre tous les faits avec preuves."""
# Setup
setup_stay_in_db(db_session, sample_stay_metadata)
# Execute
audit_record = audit_logger.record_facts(
stay_id=sample_stay_metadata.stay_id,
facts=sample_facts,
versions=sample_version_info,
)
# Verify
assert audit_record.event_type == "import"
assert audit_record.data["fact_count"] == 2
assert len(audit_record.data["facts"]) == 2
# Vérifier les statistiques
assert "facts_by_type" in audit_record.data
assert audit_record.data["facts_by_type"]["diagnostic"] == 2
assert "facts_by_certainty" in audit_record.data
assert audit_record.data["facts_by_certainty"]["affirmé"] == 1
assert audit_record.data["facts_by_certainty"]["suspecté"] == 1
# Tests pour record_codes_with_justifications (Exigence 5.3)
def test_record_codes_logs_all_codes_with_justifications(
audit_logger,
db_session,
sample_stay_metadata,
sample_coding_proposal,
sample_version_info,
):
"""Test que record_codes enregistre tous les codes avec justifications."""
# Setup
setup_stay_in_db(db_session, sample_stay_metadata)
# Execute
audit_record = audit_logger.record_codes_with_justifications(
stay_id=sample_stay_metadata.stay_id,
proposal=sample_coding_proposal,
versions=sample_version_info,
)
# Verify
assert audit_record.event_type == "coding"
assert audit_record.actor == "codeur"
assert audit_record.data["code_counts"]["dp"] == 1
assert audit_record.data["total_codes"] == 1
assert len(audit_record.data["codes"]) == 1
code_data = audit_record.data["codes"][0]
assert code_data["code"] == "I21.9"
assert code_data["type"] == "dp"
assert code_data["confidence"] == 0.95
assert code_data["evidence_count"] == 1
assert code_data["has_reasoning"] is True
# Tests pour record_verification_decision (Exigence 5.5)
def test_record_verification_logs_decision_and_errors(
audit_logger,
db_session,
sample_stay_metadata,
sample_verification_result,
sample_version_info,
):
"""Test que record_verification enregistre la décision et les erreurs."""
# Setup
setup_stay_in_db(db_session, sample_stay_metadata)
# Execute
audit_record = audit_logger.record_verification_decision(
stay_id=sample_stay_metadata.stay_id,
verification=sample_verification_result,
versions=sample_version_info,
)
# Verify
assert audit_record.event_type == "verification"
assert audit_record.actor == "verificateur"
assert audit_record.data["decision"] == "accept"
assert audit_record.data["dim_errors_count"] == 0
assert audit_record.data["contradictions_count"] == 0
def test_record_verification_with_dim_errors(
audit_logger,
db_session,
sample_stay_metadata,
sample_version_info,
):
"""Test que record_verification enregistre les erreurs DIM."""
# Setup
setup_stay_in_db(db_session, sample_stay_metadata)
verification_with_errors = VerificationResult(
stay_id=sample_stay_metadata.stay_id,
decision="veto",
dim_errors=[
DIMError(
error_type="negated_as_affirmed",
message="Diagnostic nié codé comme affirmé",
affected_codes=["I21.9"],
severity="bloquant",
)
],
contradictions=["Contradiction entre documents"],
alternatives=[],
reasoning="Erreur critique détectée",
model_version=ModelVersion(
model_name="test_model",
model_tag="v1.0",
model_digest="f" * 64,
),
prompt_version="v2.0",
)
# Execute
audit_record = audit_logger.record_verification_decision(
stay_id=sample_stay_metadata.stay_id,
verification=verification_with_errors,
versions=sample_version_info,
)
# Verify
assert audit_record.data["decision"] == "veto"
assert audit_record.data["dim_errors_count"] == 1
assert audit_record.data["dim_errors"][0]["error_type"] == "negated_as_affirmed"
assert audit_record.data["contradictions_count"] == 1
# Tests pour record_component_versions (Exigence 5.7)
def test_record_component_versions_logs_all_versions(
audit_logger,
db_session,
sample_stay_metadata,
sample_version_info,
):
"""Test que record_component_versions enregistre toutes les versions."""
# Setup
setup_stay_in_db(db_session, sample_stay_metadata)
# Execute
audit_record = audit_logger.record_component_versions(
stay_id=sample_stay_metadata.stay_id,
versions=sample_version_info,
component_name="codeur",
)
# Verify
assert audit_record.event_type == "import"
assert audit_record.data["component"] == "codeur"
assert audit_record.data["model"]["name"] == "test_model"
assert audit_record.data["model"]["digest"] == "a" * 64
assert audit_record.data["prompt"]["version"] == "v1.0"
assert audit_record.data["rules"]["version"] == "v1.0"
assert "cim10" in audit_record.data["referentiels"]
assert audit_record.data["referentiels"]["cim10"]["version"] == "2026"
assert audit_record.data["groupage_version"] == "v2026"
assert audit_record.data["inference_params"]["temperature"] == 0.7
# Tests pour export_audit_trail (Exigence 5.8, 5.10, 11.4)
def test_export_audit_trail_returns_complete_trail(
audit_logger,
db_session,
sample_stay_metadata,
sample_documents,
sample_facts,
sample_coding_proposal,
sample_verification_result,
sample_version_info,
):
"""Test que export_audit_trail retourne une piste complète."""
# Setup - Créer un séjour complet dans la DB
stay_db = setup_stay_in_db(db_session, sample_stay_metadata)
# Ajouter des documents
for doc in sample_documents:
doc_db = ClinicalDocumentDB(
document_id=doc.document_id,
stay_id=stay_db.id,
document_type=doc.document_type,
content=doc.content,
creation_date=doc.creation_date,
author=doc.author,
priority=doc.priority,
)
db_session.add(doc_db)
# Ajouter des faits
for fact in sample_facts:
fact_db = ClinicalFactDB(
fact_id=fact.fact_id,
stay_id=stay_db.id,
type=fact.type,
text=fact.text,
qualifier_certainty=fact.qualifier.certainty,
qualifier_markers=fact.qualifier.markers,
qualifier_confidence=fact.qualifier.confidence,
temporality=fact.temporality,
confidence=fact.confidence,
evidence_document_id=fact.evidence.document_id,
evidence_span_start=fact.evidence.span.start,
evidence_span_end=fact.evidence.span.end,
evidence_text=fact.evidence.text,
evidence_context=fact.evidence.context,
)
db_session.add(fact_db)
db_session.commit()
# Enregistrer une décision de codage
audit_logger.log_coding_decision(
stay_id=sample_stay_metadata.stay_id,
proposal=sample_coding_proposal,
verification=sample_verification_result,
versions=sample_version_info,
)
# Execute
audit_trail = audit_logger.export_audit_trail(
stay_id=sample_stay_metadata.stay_id,
include_pii=True,
)
# Verify
assert isinstance(audit_trail, AuditTrail)
assert audit_trail.stay_id == sample_stay_metadata.stay_id
assert audit_trail.stay_metadata == sample_stay_metadata
assert len(audit_trail.documents) == 2
assert len(audit_trail.facts) == 2
assert len(audit_trail.audit_records) >= 1
# Vérifier les champs principaux des versions (model_tag peut différer)
assert audit_trail.versions.model_name == sample_version_info.model_name
assert audit_trail.versions.model_digest == sample_version_info.model_digest
assert audit_trail.versions.prompt_version == sample_version_info.prompt_version
def test_export_audit_trail_filters_pii_when_requested(
audit_logger,
db_session,
sample_stay_metadata,
):
"""Test que export_audit_trail filtre les DIP quand include_pii=False."""
# Setup - Créer un séjour avec DIP
stay_db = setup_stay_in_db(db_session, sample_stay_metadata)
# Document avec DIP
doc_with_pii = ClinicalDocumentDB(
document_id="doc_pii",
stay_id=stay_db.id,
document_type="cr_medical",
content="Patient Jean Dupont, né le 15/03/1960, NSS: 1 60 03 75 123 456 78",
creation_date=datetime(2024, 1, 15),
author="Dr. Martin",
priority=1,
)
db_session.add(doc_with_pii)
db_session.commit()
# Execute - Export sans DIP
audit_trail = audit_logger.export_audit_trail(
stay_id=sample_stay_metadata.stay_id,
include_pii=False,
)
# Verify - Les DIP doivent être anonymisées
assert len(audit_trail.documents) == 1
doc_content = audit_trail.documents[0].content
# Vérifier que les DIP sont remplacées par des placeholders
assert "[NOM_ANONYMISÉ]" in doc_content or "Jean Dupont" not in doc_content
assert "[DATE_NAISSANCE]" in doc_content or "15/03/1960" not in doc_content
assert "[NSS]" in doc_content or "1 60 03 75 123 456 78" not in doc_content
def test_export_audit_trail_includes_pii_when_requested(
audit_logger,
db_session,
sample_stay_metadata,
):
"""Test que export_audit_trail inclut les DIP quand include_pii=True."""
# Setup
stay_db = setup_stay_in_db(db_session, sample_stay_metadata)
doc_with_pii = ClinicalDocumentDB(
document_id="doc_pii",
stay_id=stay_db.id,
document_type="cr_medical",
content="Patient Jean Dupont, né le 15/03/1960",
creation_date=datetime(2024, 1, 15),
author="Dr. Martin",
priority=1,
)
db_session.add(doc_with_pii)
db_session.commit()
# Execute - Export avec DIP
audit_trail = audit_logger.export_audit_trail(
stay_id=sample_stay_metadata.stay_id,
include_pii=True,
)
# Verify - Les DIP doivent être présentes
assert len(audit_trail.documents) == 1
doc_content = audit_trail.documents[0].content
assert "Jean Dupont" in doc_content
assert "15/03/1960" in doc_content
def test_export_audit_trail_raises_error_for_nonexistent_stay(
audit_logger,
):
"""Test que export_audit_trail lève une erreur pour un séjour inexistant."""
# Execute & Verify
with pytest.raises(ValueError, match="Stay .* not found"):
audit_logger.export_audit_trail(
stay_id="nonexistent_stay",
include_pii=False,
)
# Tests d'intégration
def test_complete_audit_workflow(
audit_logger,
db_session,
sample_stay_metadata,
sample_documents,
sample_facts,
sample_coding_proposal,
sample_verification_result,
sample_version_info,
):
"""Test du workflow complet d'audit."""
# Setup
stay_db = setup_stay_in_db(db_session, sample_stay_metadata)
# 1. Enregistrer les documents
audit_logger.record_documents(
stay_id=sample_stay_metadata.stay_id,
documents=sample_documents,
versions=sample_version_info,
)
# 2. Enregistrer les faits
audit_logger.record_facts(
stay_id=sample_stay_metadata.stay_id,
facts=sample_facts,
versions=sample_version_info,
)
# 3. Enregistrer les codes
audit_logger.record_codes_with_justifications(
stay_id=sample_stay_metadata.stay_id,
proposal=sample_coding_proposal,
versions=sample_version_info,
)
# 4. Enregistrer la vérification
audit_logger.record_verification_decision(
stay_id=sample_stay_metadata.stay_id,
verification=sample_verification_result,
versions=sample_version_info,
)
# 5. Enregistrer une correction TIM
audit_logger.log_tim_correction(
stay_id=sample_stay_metadata.stay_id,
original_code="I21.9",
corrected_code="I21.0",
user_id="user_123",
comment="Précision du diagnostic",
versions=sample_version_info,
)
# 6. Enregistrer une validation
audit_logger.log_validation(
stay_id=sample_stay_metadata.stay_id,
user_id="user_123",
validation_status="accepted",
comment="Validation après correction",
versions=sample_version_info,
)
# Verify - Vérifier que tous les enregistrements sont présents
from pipeline_mco_pmsi.database.models import AuditRecordDB
audit_records = (
db_session.query(AuditRecordDB)
.join(StayDB)
.filter(StayDB.stay_id == sample_stay_metadata.stay_id)
.all()
)
# Devrait avoir 6 enregistrements d'audit
assert len(audit_records) >= 6
# Vérifier les types d'événements
event_types = [record.event_type for record in audit_records]
assert "import" in event_types
assert "coding" in event_types
assert "verification" in event_types
assert "correction" in event_types
assert "validation" in event_types
if __name__ == "__main__":
pytest.main([__file__, "-v"])
class TestAuditLoggerEncryption:
"""Tests pour le chiffrement des exports d'audit."""
def test_export_audit_trail_encrypted(
self, audit_logger, db_session, sample_stay_metadata, sample_version_info
):
"""Test export d'audit chiffré."""
from pipeline_mco_pmsi.security.encryption import EncryptionKey
# Créer un séjour dans la base
stay_db = StayDB(
stay_id=sample_stay_metadata.stay_id,
admission_date=sample_stay_metadata.admission_date,
discharge_date=sample_stay_metadata.discharge_date,
specialty=sample_stay_metadata.specialty,
unit=sample_stay_metadata.unit,
age=sample_stay_metadata.age,
sex=sample_stay_metadata.sex,
)
db_session.add(stay_db)
db_session.commit()
# Ajouter un document
doc_db = ClinicalDocumentDB(
stay_id=stay_db.id,
document_id="DOC001",
document_type="cr_operatoire",
content="Patient avec HTA",
creation_date=datetime(2024, 1, 15),
author="Dr. Smith",
priority=1,
)
db_session.add(doc_db)
db_session.commit()
# Générer une clé de chiffrement
encryption_key = EncryptionKey.generate()
# Exporter avec chiffrement
encrypted_data = audit_logger.export_audit_trail_encrypted(
stay_id=sample_stay_metadata.stay_id,
encryption_key=encryption_key,
include_pii=False,
)
# Vérifier que les données sont chiffrées
assert isinstance(encrypted_data, bytes)
assert len(encrypted_data) > 0
# Les données chiffrées ne doivent pas contenir le texte original
assert b"stay_001" not in encrypted_data
assert b"HTA" not in encrypted_data
def test_export_encrypted_can_be_decrypted(
self, audit_logger, db_session, sample_stay_metadata, sample_version_info
):
"""Test que les données chiffrées peuvent être déchiffrées."""
from pipeline_mco_pmsi.security.encryption import AuditEncryptor, EncryptionKey
# Créer un séjour dans la base
stay_db = StayDB(
stay_id=sample_stay_metadata.stay_id,
admission_date=sample_stay_metadata.admission_date,
discharge_date=sample_stay_metadata.discharge_date,
specialty=sample_stay_metadata.specialty,
unit=sample_stay_metadata.unit,
age=sample_stay_metadata.age,
sex=sample_stay_metadata.sex,
)
db_session.add(stay_db)
db_session.commit()
# Ajouter un document
doc_db = ClinicalDocumentDB(
stay_id=stay_db.id,
document_id="DOC001",
document_type="cr_operatoire",
content="Patient avec HTA",
creation_date=datetime(2024, 1, 15),
author="Dr. Smith",
priority=1,
)
db_session.add(doc_db)
db_session.commit()
# Générer une clé de chiffrement
encryption_key = EncryptionKey.generate()
# Exporter avec chiffrement
encrypted_data = audit_logger.export_audit_trail_encrypted(
stay_id=sample_stay_metadata.stay_id,
encryption_key=encryption_key,
include_pii=False,
)
# Déchiffrer
encryptor = AuditEncryptor(encryption_key)
decrypted_data = encryptor.decrypt_audit_data(encrypted_data)
# Vérifier que les données déchiffrées contiennent les informations attendues
assert decrypted_data["stay_id"] == sample_stay_metadata.stay_id
assert len(decrypted_data["documents"]) == 1
assert decrypted_data["documents"][0]["document_id"] == "DOC001"
def test_export_encrypted_with_pii_filtering(
self, audit_logger, db_session, sample_stay_metadata
):
"""Test que le chiffrement fonctionne avec le filtrage DIP."""
from pipeline_mco_pmsi.security.encryption import AuditEncryptor, EncryptionKey
# Créer un séjour dans la base
stay_db = StayDB(
stay_id=sample_stay_metadata.stay_id,
admission_date=sample_stay_metadata.admission_date,
discharge_date=sample_stay_metadata.discharge_date,
specialty=sample_stay_metadata.specialty,
unit=sample_stay_metadata.unit,
age=sample_stay_metadata.age,
sex=sample_stay_metadata.sex,
)
db_session.add(stay_db)
db_session.commit()
# Ajouter un document avec DIP
doc_db = ClinicalDocumentDB(
stay_id=stay_db.id,
document_id="DOC001",
document_type="cr_operatoire",
content="Patient Jean Dupont né le 15/03/1959 avec HTA",
creation_date=datetime(2024, 1, 15),
author="Dr. Smith",
priority=1,
)
db_session.add(doc_db)
db_session.commit()
# Générer une clé de chiffrement
encryption_key = EncryptionKey.generate()
# Exporter avec chiffrement et filtrage DIP
encrypted_data = audit_logger.export_audit_trail_encrypted(
stay_id=sample_stay_metadata.stay_id,
encryption_key=encryption_key,
include_pii=False,
)
# Déchiffrer
encryptor = AuditEncryptor(encryption_key)
decrypted_data = encryptor.decrypt_audit_data(encrypted_data)
# Vérifier que les DIP ont été filtrées
content = decrypted_data["documents"][0]["content"]
assert "Jean Dupont" not in content
assert "15/03/1959" not in content
# Le PIIProtector utilise [NOM_ANONYMISÉ] et [DATE_NAISSANCE]
assert "[NOM_ANONYMISÉ]" in content or "[DATE_NAISSANCE]" in content
def test_export_encrypted_different_keys_fail(
self, audit_logger, db_session, sample_stay_metadata
):
"""Test qu'une clé différente ne peut pas déchiffrer."""
from pipeline_mco_pmsi.security.encryption import AuditEncryptor, EncryptionKey
from cryptography.fernet import InvalidToken
# Créer un séjour dans la base
stay_db = StayDB(
stay_id=sample_stay_metadata.stay_id,
admission_date=sample_stay_metadata.admission_date,
discharge_date=sample_stay_metadata.discharge_date,
specialty=sample_stay_metadata.specialty,
unit=sample_stay_metadata.unit,
age=sample_stay_metadata.age,
sex=sample_stay_metadata.sex,
)
db_session.add(stay_db)
db_session.commit()
# Générer deux clés différentes
encryption_key1 = EncryptionKey.generate()
encryption_key2 = EncryptionKey.generate()
# Exporter avec la première clé
encrypted_data = audit_logger.export_audit_trail_encrypted(
stay_id=sample_stay_metadata.stay_id,
encryption_key=encryption_key1,
include_pii=False,
)
# Tenter de déchiffrer avec la deuxième clé
encryptor2 = AuditEncryptor(encryption_key2)
with pytest.raises(InvalidToken):
encryptor2.decrypt_audit_data(encrypted_data)
def test_export_encrypted_with_complex_data(
self, audit_logger, db_session, sample_stay_metadata, sample_version_info
):
"""Test chiffrement avec données complexes (codes, faits, etc.)."""
from pipeline_mco_pmsi.security.encryption import AuditEncryptor, EncryptionKey
# Créer un séjour dans la base
stay_db = StayDB(
stay_id=sample_stay_metadata.stay_id,
admission_date=sample_stay_metadata.admission_date,
discharge_date=sample_stay_metadata.discharge_date,
specialty=sample_stay_metadata.specialty,
unit=sample_stay_metadata.unit,
age=sample_stay_metadata.age,
sex=sample_stay_metadata.sex,
)
db_session.add(stay_db)
db_session.commit()
# Ajouter un document
doc_db = ClinicalDocumentDB(
stay_id=stay_db.id,
document_id="DOC001",
document_type="cr_operatoire",
content="Patient avec HTA",
creation_date=datetime(2024, 1, 15),
author="Dr. Smith",
priority=1,
)
db_session.add(doc_db)
db_session.commit()
# Ajouter un fait clinique
fact_db = ClinicalFactDB(
stay_id=stay_db.id,
fact_id="FACT001",
type="diagnostic",
text="Hypertension artérielle",
qualifier_certainty="affirmé",
qualifier_markers=[],
qualifier_confidence=0.95,
temporality="actuel",
evidence_document_id="DOC001",
evidence_span_start=0,
evidence_span_end=20,
evidence_text="Patient avec HTA",
evidence_context=None,
confidence=0.95,
)
db_session.add(fact_db)
db_session.commit()
# Ajouter un code
code_db = CodeDB(
stay_id=stay_db.id,
code="I10",
label="Hypertension essentielle",
type="dp",
confidence=0.9,
reasoning="HTA documentée",
referentiel_version="2026",
status="proposed",
model_name="test_model",
model_digest="a" * 64,
prompt_version="v1.0",
)
db_session.add(code_db)
db_session.commit()
# Ajouter une preuve pour le code
evidence_db = EvidenceDB(
code_id=code_db.id,
document_id=doc_db.id,
span_start=0,
span_end=20,
text="Patient avec HTA",
context=None,
)
db_session.add(evidence_db)
db_session.commit()
# Générer une clé de chiffrement
encryption_key = EncryptionKey.generate()
# Exporter avec chiffrement
encrypted_data = audit_logger.export_audit_trail_encrypted(
stay_id=sample_stay_metadata.stay_id,
encryption_key=encryption_key,
include_pii=False,
)
# Déchiffrer
encryptor = AuditEncryptor(encryption_key)
decrypted_data = encryptor.decrypt_audit_data(encrypted_data)
# Vérifier que toutes les données sont présentes
assert decrypted_data["stay_id"] == sample_stay_metadata.stay_id
assert len(decrypted_data["documents"]) == 1
assert len(decrypted_data["facts"]) == 1
assert decrypted_data["facts"][0]["fact_id"] == "FACT001"
# Le code est dans coding_proposal qui peut être None si pas chargé correctement
# On vérifie juste que la structure est là
assert "coding_proposal" in decrypted_data