Files
rpa_vision_v3/tests/unit/test_input_validation.py
Dom cf495dd82f feat: chat unifié, GestureCatalog, Copilot, Léa UI, extraction données, vérification replay
Refonte majeure du système Agent Chat et ajout de nombreux modules :

- Chat unifié : suppression du dual Workflows/Agent Libre, tout passe par /api/chat
  avec résolution en 3 niveaux (workflow → geste → "montre-moi")
- GestureCatalog : 38 raccourcis clavier universels Windows avec matching sémantique,
  substitution automatique dans les replays, et endpoint /api/gestures
- Mode Copilot : exécution pas-à-pas des workflows avec validation humaine via WebSocket
  (approve/skip/abort) avant chaque action
- Léa UI (agent_v0/lea_ui/) : interface PyQt5 pour Windows avec overlay transparent
  pour feedback visuel pendant le replay
- Data Extraction (core/extraction/) : moteur d'extraction visuelle de données
  (OCR + VLM → SQLite), avec schémas YAML et export CSV/Excel
- ReplayVerifier (agent_v0/server_v1/) : vérification post-action par comparaison
  de screenshots, avec logique de retry (max 3)
- IntentParser durci : meilleur fallback regex, type GREETING, patterns améliorés
- Dashboard : nouvelles pages gestures, streaming, extractions
- Tests : 63 tests GestureCatalog, 47 tests extraction, corrections tests existants
- Dépréciation : /api/agent/plan et /api/agent/execute retournent HTTP 410,
  suppression du code hardcodé _plan_to_replay_actions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 10:02:09 +01:00

335 lines
12 KiB
Python

"""
Tests pour le système de validation des entrées utilisateur.
Exigence 7.2: Protection contre les injections SQL/NoSQL
Exigence 7.3: Validation des chemins de fichiers
Exigence 7.4: Sanitization des données loggées
"""
import pytest
import sys
import os
import re
import html
import json
from pathlib import Path
from typing import Any, List, Optional
from dataclasses import dataclass
# Ajouter le répertoire racine au path pour les imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
# Import direct des composants nécessaires
from core.security.security_config import get_security_config, hash_sensitive_value
@dataclass
class ValidationResult:
"""Résultat de validation d'une entrée."""
is_valid: bool
sanitized_value: Any
errors: List[str]
warnings: List[str]
def __post_init__(self):
if self.errors is None:
self.errors = []
if self.warnings is None:
self.warnings = []
class InputValidationError(Exception):
"""Erreur de validation d'entrée."""
pass
class SimpleInputValidator:
"""Validateur d'entrées utilisateur simplifié pour les tests."""
# Patterns dangereux pour injection SQL
SQL_INJECTION_PATTERNS = [
r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC|EXECUTE)\b)",
r"(\b(UNION|OR|AND)\s+\d+\s*=\s*\d+)",
r"(--|#|/\*|\*/)",
r"(\b(SCRIPT|JAVASCRIPT|VBSCRIPT|ONLOAD|ONERROR)\b)",
r"([\'\";])",
r"(\bxp_cmdshell\b)",
r"(\bsp_executesql\b)"
]
# Patterns dangereux pour injection NoSQL
NOSQL_INJECTION_PATTERNS = [
r"(\$where|\$regex|\$ne|\$gt|\$lt|\$in|\$nin)",
r"(function\s*\(|\beval\b|\bsetTimeout\b)",
r"(\{\s*\$.*\})",
r"(this\.|db\.)"
]
def __init__(self, strict_mode: bool = True):
"""Initialise le validateur."""
self.strict_mode = strict_mode
self.log_sensitive = False
# Compiler les patterns pour performance
self._sql_patterns = [re.compile(pattern, re.IGNORECASE) for pattern in self.SQL_INJECTION_PATTERNS]
self._nosql_patterns = [re.compile(pattern, re.IGNORECASE) for pattern in self.NOSQL_INJECTION_PATTERNS]
def validate_string(self, value: str, max_length: int = 1000,
allow_html: bool = False, field_name: str = "input") -> ValidationResult:
"""Valide une chaîne de caractères."""
errors = []
warnings = []
sanitized = value
if not isinstance(value, str):
errors.append(f"{field_name} must be a string")
return ValidationResult(False, None, errors, warnings)
# Vérifier la longueur
if len(value) > max_length:
if self.strict_mode:
errors.append(f"{field_name} exceeds maximum length of {max_length}")
else:
warnings.append(f"{field_name} truncated to {max_length} characters")
sanitized = value[:max_length]
# Vérifier les injections SQL
for pattern in self._sql_patterns:
if pattern.search(value):
if self.strict_mode:
errors.append(f"{field_name} contains potential SQL injection pattern")
else:
warnings.append(f"{field_name} contains suspicious SQL pattern")
# Vérifier les injections NoSQL
for pattern in self._nosql_patterns:
if pattern.search(value):
if self.strict_mode:
errors.append(f"{field_name} contains potential NoSQL injection pattern")
else:
warnings.append(f"{field_name} contains suspicious NoSQL pattern")
# Sanitizer HTML si nécessaire
if not allow_html:
sanitized = html.escape(sanitized)
# Nettoyer les caractères de contrôle
sanitized = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', sanitized)
is_valid = len(errors) == 0
return ValidationResult(is_valid, sanitized, errors, warnings)
def sanitize_for_logging(self, data: Any, field_name: str = "data") -> str:
"""Sanitise des données pour le logging."""
try:
if isinstance(data, (dict, list)):
data_str = json.dumps(data, ensure_ascii=True, separators=(',', ':'))
else:
data_str = str(data)
# Limiter la taille pour les logs
if len(data_str) > 200:
data_str = data_str[:200] + "..."
# Échapper les caractères dangereux
data_str = html.escape(data_str)
return data_str
except Exception:
return f"{field_name}[unprintable:{type(data).__name__}]"
def validate_string_input(value: str, max_length: int = 1000,
allow_html: bool = False, field_name: str = "input") -> str:
"""Valide et sanitise une entrée string."""
validator = SimpleInputValidator(strict_mode=True)
result = validator.validate_string(value, max_length, allow_html, field_name)
if not result.is_valid:
raise InputValidationError(f"Validation failed for {field_name}: {'; '.join(result.errors)}")
return result.sanitized_value
class TestSimpleInputValidator:
"""Tests pour la classe SimpleInputValidator."""
def setup_method(self):
"""Setup pour chaque test."""
self.validator = SimpleInputValidator(strict_mode=True)
self.lenient_validator = SimpleInputValidator(strict_mode=False)
def test_validate_string_basic(self):
"""Test de validation basique de string."""
result = self.validator.validate_string("hello world", field_name="test")
assert result.is_valid
assert result.sanitized_value == "hello world"
assert len(result.errors) == 0
assert len(result.warnings) == 0
def test_validate_string_sql_injection_strict(self):
"""Test de détection d'injection SQL en mode strict."""
malicious_inputs = [
"'; DROP TABLE users; --",
"1' OR '1'='1",
"UNION SELECT * FROM passwords"
]
for malicious_input in malicious_inputs:
result = self.validator.validate_string(malicious_input)
assert not result.is_valid, f"Should reject: {malicious_input}"
assert any("SQL injection" in error for error in result.errors)
def test_validate_string_nosql_injection_strict(self):
"""Test de détection d'injection NoSQL en mode strict."""
malicious_inputs = [
'{"$where": "this.username == this.password"}',
'{"$regex": ".*"}',
'function() { return true; }'
]
for malicious_input in malicious_inputs:
result = self.validator.validate_string(malicious_input)
assert not result.is_valid, f"Should reject: {malicious_input}"
assert any("injection" in error for error in result.errors)
def test_validate_string_html_escape(self):
"""Test d'échappement HTML.
Note: L'entrée '<script>alert("xss")</script>' contient des guillemets
qui déclenchent la détection SQL injection en mode strict. L'échappement
HTML fonctionne correctement mais is_valid=False à cause des patterns SQL.
"""
html_input = '<script>alert("xss")</script>'
result = self.validator.validate_string(html_input, allow_html=False)
# En mode strict, les guillemets déclenchent la détection SQL injection
assert not result.is_valid
assert "&lt;script&gt;" in result.sanitized_value
assert "&lt;/script&gt;" in result.sanitized_value
# Vérifier aussi avec une entrée HTML sans guillemets
simple_html = '<b>bold</b>'
result2 = self.validator.validate_string(simple_html, allow_html=False)
assert result2.is_valid
assert "&lt;b&gt;" in result2.sanitized_value
def test_validate_string_max_length_strict(self):
"""Test de dépassement de longueur en mode strict."""
long_string = "a" * 1001
result = self.validator.validate_string(long_string, max_length=1000)
assert not result.is_valid
assert "exceeds maximum length" in result.errors[0]
def test_validate_string_max_length_lenient(self):
"""Test de dépassement de longueur en mode lenient."""
long_string = "a" * 1001
result = self.lenient_validator.validate_string(long_string, max_length=1000)
assert result.is_valid
assert len(result.sanitized_value) == 1000
assert "truncated" in result.warnings[0]
def test_sanitize_for_logging_basic(self):
"""Test de sanitisation basique."""
result = self.validator.sanitize_for_logging("test data", "field")
assert "test data" in result
def test_sanitize_for_logging_large_data(self):
"""Test de sanitisation de données volumineuses."""
large_data = "x" * 300
result = self.validator.sanitize_for_logging(large_data, "large_field")
assert len(result) <= 203 # 200 + "..."
assert result.endswith("...")
def test_sanitize_for_logging_html_escape(self):
"""Test d'échappement HTML dans les logs."""
malicious_data = '<script>alert("xss")</script>'
result = self.validator.sanitize_for_logging(malicious_data, "html_field")
assert "&lt;script&gt;" in result
assert "<script>" not in result
class TestInputValidationFunctions:
"""Tests pour les fonctions utilitaires de validation."""
def test_validate_string_input_success(self):
"""Test de validation string réussie."""
result = validate_string_input("hello world", field_name="test")
assert result == "hello world"
def test_validate_string_input_failure(self):
"""Test de validation string échouée."""
with pytest.raises(InputValidationError) as exc_info:
validate_string_input("'; DROP TABLE users; --", field_name="malicious")
assert "Validation failed for malicious" in str(exc_info.value)
class TestValidationResult:
"""Tests pour la classe ValidationResult."""
def test_validation_result_init(self):
"""Test d'initialisation de ValidationResult."""
result = ValidationResult(
is_valid=True,
sanitized_value="test",
errors=["error1"],
warnings=["warning1"]
)
assert result.is_valid
assert result.sanitized_value == "test"
assert result.errors == ["error1"]
assert result.warnings == ["warning1"]
@pytest.mark.integration
class TestInputValidationIntegration:
"""Tests d'intégration pour la validation des entrées."""
def test_end_to_end_validation_pipeline(self):
"""Test de pipeline de validation de bout en bout."""
validator = SimpleInputValidator(strict_mode=True)
# Test avec données valides
valid_data = [
"hello world",
"user@example.com",
"normal data 123"
]
for data in valid_data:
result = validator.validate_string(data)
assert result.is_valid, f"Should accept valid data: {data}"
# Test avec données malicieuses
malicious_data = [
"'; DROP TABLE users; --",
'{"$where": "this.password"}',
"UNION SELECT * FROM passwords"
]
for data in malicious_data:
result = validator.validate_string(data)
assert not result.is_valid, f"Should reject malicious data: {data}"
def test_strict_vs_lenient_modes(self):
"""Test des modes strict vs lenient."""
strict_validator = SimpleInputValidator(strict_mode=True)
lenient_validator = SimpleInputValidator(strict_mode=False)
# Test avec données trop longues
long_data = "a" * 1500
strict_result = strict_validator.validate_string(long_data, max_length=1000)
lenient_result = lenient_validator.validate_string(long_data, max_length=1000)
assert not strict_result.is_valid
assert lenient_result.is_valid
assert len(lenient_result.sanitized_value) == 1000