""" Validation des données extraites. Vérifie les formats, la cohérence, et signale les anomalies. Applique un auto-fix en safety-net via le normalizer avant validation. """ import re import logging from dataclasses import dataclass, field from config import DECISION_VALUES, TYPE_DESACCORD_VALUES from extractor.normalizer import normalize_extraction logger = logging.getLogger(__name__) # Patterns de codes médicaux CIM10_PATTERN = re.compile(r'^[A-Z]\d{2}(?:\.\d{1,2})?$') CCAM_PATTERN = re.compile(r'^[A-Z]{4}\d{3}(?:-\d)?$') @dataclass class ValidationResult: """Résultat de validation d'une extraction.""" is_valid: bool warnings: list # list[str] errors: list # list[str] fixes: list = field(default_factory=list) # list[str] — auto-corrections appliquées def _validate_codes(codes_str: str | None, field_name: str) -> list[str]: """Valide une chaîne de codes CIM-10/CCAM.""" warnings = [] if not codes_str: return warnings codes = [c.strip() for c in codes_str.split(',')] for code in codes: if not code: continue is_cim10 = CIM10_PATTERN.match(code) is_ccam = CCAM_PATTERN.match(code) if not is_cim10 and not is_ccam: warnings.append(f"{field_name} : code '{code}' ne correspond ni à CIM-10 ni à CCAM") return warnings def validate_extraction(extraction) -> ValidationResult: """ Valide une extraction OGC. Applique d'abord un auto-fix en safety-net via le normalizer, puis retourne les warnings et erreurs détectés. """ warnings = [] errors = [] fixes = [] # Vérifier l'extraction elle-même if not extraction.extraction_success: errors.append(f"Extraction échouée : {extraction.error_message}") return ValidationResult(is_valid=False, warnings=warnings, errors=errors) # Safety-net : auto-fix via normalizer avant validation fixes = normalize_extraction(extraction) # Vérifier la décision if extraction.decision_ucr and extraction.decision_ucr not in DECISION_VALUES: warnings.append(f"Décision non standard : '{extraction.decision_ucr}'") # Vérifier le type de désaccord if extraction.type_desaccord and extraction.type_desaccord not in TYPE_DESACCORD_VALUES: warnings.append(f"Type désaccord non standard : '{extraction.type_desaccord}'") # Vérifier les codes warnings.extend(_validate_codes(extraction.codes_etablissement, "codes_etablissement")) warnings.extend(_validate_codes(extraction.codes_controleurs, "codes_controleurs")) warnings.extend(_validate_codes(extraction.codes_retenus, "codes_retenus")) # Vérifier la cohérence décision / codes retenus if extraction.decision_ucr == "Défavorable" and not extraction.codes_retenus: if extraction.codes_controleurs: warnings.append("Décision défavorable mais codes_retenus vide — les codes contrôleurs devraient être retenus") # Vérifier que le texte de décision n'est pas vide if not extraction.texte_decision or len(extraction.texte_decision.strip()) < 20: warnings.append("Texte de décision absent ou très court") is_valid = len(errors) == 0 return ValidationResult(is_valid=is_valid, warnings=warnings, errors=errors, fixes=fixes) def validate_all(extractions: list) -> dict: """ Valide toutes les extractions et retourne un rapport. Inclut les auto-corrections appliquées par le safety-net. """ total = len(extractions) valid = 0 with_warnings = 0 failed = 0 all_warnings = [] all_errors = [] all_fixes = [] for ext in extractions: result = validate_extraction(ext) # Collecter les auto-corrections if result.fixes: for f in result.fixes: fix_msg = f"OGC {ext.num_ogc} (Champ {ext.champ}) : {f}" all_fixes.append(fix_msg) logger.info(f" 🔧 {fix_msg}") if result.is_valid: valid += 1 if result.warnings: with_warnings += 1 for w in result.warnings: all_warnings.append(f"OGC {ext.num_ogc} (Champ {ext.champ}) : {w}") else: failed += 1 for e in result.errors: all_errors.append(f"OGC {ext.num_ogc} (Champ {ext.champ}) : {e}") report = { "total": total, "valid": valid, "with_warnings": with_warnings, "failed": failed, "warnings": all_warnings, "errors": all_errors, "fixes": all_fixes, "total_fixes": len(all_fixes), } logger.info(f"Validation : {valid}/{total} OK, {with_warnings} avec warnings, {failed} échoués") if all_fixes: logger.info(f" {len(all_fixes)} auto-corrections appliquées par le safety-net") if all_warnings: for w in all_warnings[:10]: # Limiter l'affichage logger.warning(f" ⚠ {w}") if len(all_warnings) > 10: logger.warning(f" ... et {len(all_warnings) - 10} autres warnings") if all_errors: for e in all_errors: logger.error(f" ✗ {e}") return report