Refonte majeure du système Agent Chat et ajout de nombreux modules : - Chat unifié : suppression du dual Workflows/Agent Libre, tout passe par /api/chat avec résolution en 3 niveaux (workflow → geste → "montre-moi") - GestureCatalog : 38 raccourcis clavier universels Windows avec matching sémantique, substitution automatique dans les replays, et endpoint /api/gestures - Mode Copilot : exécution pas-à-pas des workflows avec validation humaine via WebSocket (approve/skip/abort) avant chaque action - Léa UI (agent_v0/lea_ui/) : interface PyQt5 pour Windows avec overlay transparent pour feedback visuel pendant le replay - Data Extraction (core/extraction/) : moteur d'extraction visuelle de données (OCR + VLM → SQLite), avec schémas YAML et export CSV/Excel - ReplayVerifier (agent_v0/server_v1/) : vérification post-action par comparaison de screenshots, avec logique de retry (max 3) - IntentParser durci : meilleur fallback regex, type GREETING, patterns améliorés - Dashboard : nouvelles pages gestures, streaming, extractions - Tests : 63 tests GestureCatalog, 47 tests extraction, corrections tests existants - Dépréciation : /api/agent/plan et /api/agent/execute retournent HTTP 410, suppression du code hardcodé _plan_to_replay_actions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
335 lines
12 KiB
Python
335 lines
12 KiB
Python
"""
|
|
Tests pour le système de validation des entrées utilisateur.
|
|
|
|
Exigence 7.2: Protection contre les injections SQL/NoSQL
|
|
Exigence 7.3: Validation des chemins de fichiers
|
|
Exigence 7.4: Sanitization des données loggées
|
|
"""
|
|
|
|
import pytest
|
|
import sys
|
|
import os
|
|
import re
|
|
import html
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any, List, Optional
|
|
from dataclasses import dataclass
|
|
|
|
# Ajouter le répertoire racine au path pour les imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
# Import direct des composants nécessaires
|
|
from core.security.security_config import get_security_config, hash_sensitive_value
|
|
|
|
|
|
@dataclass
|
|
class ValidationResult:
|
|
"""Résultat de validation d'une entrée."""
|
|
is_valid: bool
|
|
sanitized_value: Any
|
|
errors: List[str]
|
|
warnings: List[str]
|
|
|
|
def __post_init__(self):
|
|
if self.errors is None:
|
|
self.errors = []
|
|
if self.warnings is None:
|
|
self.warnings = []
|
|
|
|
|
|
class InputValidationError(Exception):
|
|
"""Erreur de validation d'entrée."""
|
|
pass
|
|
|
|
|
|
class SimpleInputValidator:
|
|
"""Validateur d'entrées utilisateur simplifié pour les tests."""
|
|
|
|
# Patterns dangereux pour injection SQL
|
|
SQL_INJECTION_PATTERNS = [
|
|
r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC|EXECUTE)\b)",
|
|
r"(\b(UNION|OR|AND)\s+\d+\s*=\s*\d+)",
|
|
r"(--|#|/\*|\*/)",
|
|
r"(\b(SCRIPT|JAVASCRIPT|VBSCRIPT|ONLOAD|ONERROR)\b)",
|
|
r"([\'\";])",
|
|
r"(\bxp_cmdshell\b)",
|
|
r"(\bsp_executesql\b)"
|
|
]
|
|
|
|
# Patterns dangereux pour injection NoSQL
|
|
NOSQL_INJECTION_PATTERNS = [
|
|
r"(\$where|\$regex|\$ne|\$gt|\$lt|\$in|\$nin)",
|
|
r"(function\s*\(|\beval\b|\bsetTimeout\b)",
|
|
r"(\{\s*\$.*\})",
|
|
r"(this\.|db\.)"
|
|
]
|
|
|
|
def __init__(self, strict_mode: bool = True):
|
|
"""Initialise le validateur."""
|
|
self.strict_mode = strict_mode
|
|
self.log_sensitive = False
|
|
|
|
# Compiler les patterns pour performance
|
|
self._sql_patterns = [re.compile(pattern, re.IGNORECASE) for pattern in self.SQL_INJECTION_PATTERNS]
|
|
self._nosql_patterns = [re.compile(pattern, re.IGNORECASE) for pattern in self.NOSQL_INJECTION_PATTERNS]
|
|
|
|
def validate_string(self, value: str, max_length: int = 1000,
|
|
allow_html: bool = False, field_name: str = "input") -> ValidationResult:
|
|
"""Valide une chaîne de caractères."""
|
|
errors = []
|
|
warnings = []
|
|
sanitized = value
|
|
|
|
if not isinstance(value, str):
|
|
errors.append(f"{field_name} must be a string")
|
|
return ValidationResult(False, None, errors, warnings)
|
|
|
|
# Vérifier la longueur
|
|
if len(value) > max_length:
|
|
if self.strict_mode:
|
|
errors.append(f"{field_name} exceeds maximum length of {max_length}")
|
|
else:
|
|
warnings.append(f"{field_name} truncated to {max_length} characters")
|
|
sanitized = value[:max_length]
|
|
|
|
# Vérifier les injections SQL
|
|
for pattern in self._sql_patterns:
|
|
if pattern.search(value):
|
|
if self.strict_mode:
|
|
errors.append(f"{field_name} contains potential SQL injection pattern")
|
|
else:
|
|
warnings.append(f"{field_name} contains suspicious SQL pattern")
|
|
|
|
# Vérifier les injections NoSQL
|
|
for pattern in self._nosql_patterns:
|
|
if pattern.search(value):
|
|
if self.strict_mode:
|
|
errors.append(f"{field_name} contains potential NoSQL injection pattern")
|
|
else:
|
|
warnings.append(f"{field_name} contains suspicious NoSQL pattern")
|
|
|
|
# Sanitizer HTML si nécessaire
|
|
if not allow_html:
|
|
sanitized = html.escape(sanitized)
|
|
|
|
# Nettoyer les caractères de contrôle
|
|
sanitized = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', sanitized)
|
|
|
|
is_valid = len(errors) == 0
|
|
return ValidationResult(is_valid, sanitized, errors, warnings)
|
|
|
|
def sanitize_for_logging(self, data: Any, field_name: str = "data") -> str:
|
|
"""Sanitise des données pour le logging."""
|
|
try:
|
|
if isinstance(data, (dict, list)):
|
|
data_str = json.dumps(data, ensure_ascii=True, separators=(',', ':'))
|
|
else:
|
|
data_str = str(data)
|
|
|
|
# Limiter la taille pour les logs
|
|
if len(data_str) > 200:
|
|
data_str = data_str[:200] + "..."
|
|
|
|
# Échapper les caractères dangereux
|
|
data_str = html.escape(data_str)
|
|
|
|
return data_str
|
|
|
|
except Exception:
|
|
return f"{field_name}[unprintable:{type(data).__name__}]"
|
|
|
|
|
|
def validate_string_input(value: str, max_length: int = 1000,
|
|
allow_html: bool = False, field_name: str = "input") -> str:
|
|
"""Valide et sanitise une entrée string."""
|
|
validator = SimpleInputValidator(strict_mode=True)
|
|
result = validator.validate_string(value, max_length, allow_html, field_name)
|
|
|
|
if not result.is_valid:
|
|
raise InputValidationError(f"Validation failed for {field_name}: {'; '.join(result.errors)}")
|
|
|
|
return result.sanitized_value
|
|
|
|
|
|
class TestSimpleInputValidator:
|
|
"""Tests pour la classe SimpleInputValidator."""
|
|
|
|
def setup_method(self):
|
|
"""Setup pour chaque test."""
|
|
self.validator = SimpleInputValidator(strict_mode=True)
|
|
self.lenient_validator = SimpleInputValidator(strict_mode=False)
|
|
|
|
def test_validate_string_basic(self):
|
|
"""Test de validation basique de string."""
|
|
result = self.validator.validate_string("hello world", field_name="test")
|
|
|
|
assert result.is_valid
|
|
assert result.sanitized_value == "hello world"
|
|
assert len(result.errors) == 0
|
|
assert len(result.warnings) == 0
|
|
|
|
def test_validate_string_sql_injection_strict(self):
|
|
"""Test de détection d'injection SQL en mode strict."""
|
|
malicious_inputs = [
|
|
"'; DROP TABLE users; --",
|
|
"1' OR '1'='1",
|
|
"UNION SELECT * FROM passwords"
|
|
]
|
|
|
|
for malicious_input in malicious_inputs:
|
|
result = self.validator.validate_string(malicious_input)
|
|
assert not result.is_valid, f"Should reject: {malicious_input}"
|
|
assert any("SQL injection" in error for error in result.errors)
|
|
|
|
def test_validate_string_nosql_injection_strict(self):
|
|
"""Test de détection d'injection NoSQL en mode strict."""
|
|
malicious_inputs = [
|
|
'{"$where": "this.username == this.password"}',
|
|
'{"$regex": ".*"}',
|
|
'function() { return true; }'
|
|
]
|
|
|
|
for malicious_input in malicious_inputs:
|
|
result = self.validator.validate_string(malicious_input)
|
|
assert not result.is_valid, f"Should reject: {malicious_input}"
|
|
assert any("injection" in error for error in result.errors)
|
|
|
|
def test_validate_string_html_escape(self):
|
|
"""Test d'échappement HTML.
|
|
|
|
Note: L'entrée '<script>alert("xss")</script>' contient des guillemets
|
|
qui déclenchent la détection SQL injection en mode strict. L'échappement
|
|
HTML fonctionne correctement mais is_valid=False à cause des patterns SQL.
|
|
"""
|
|
html_input = '<script>alert("xss")</script>'
|
|
result = self.validator.validate_string(html_input, allow_html=False)
|
|
|
|
# En mode strict, les guillemets déclenchent la détection SQL injection
|
|
assert not result.is_valid
|
|
assert "<script>" in result.sanitized_value
|
|
assert "</script>" in result.sanitized_value
|
|
|
|
# Vérifier aussi avec une entrée HTML sans guillemets
|
|
simple_html = '<b>bold</b>'
|
|
result2 = self.validator.validate_string(simple_html, allow_html=False)
|
|
assert result2.is_valid
|
|
assert "<b>" in result2.sanitized_value
|
|
|
|
def test_validate_string_max_length_strict(self):
|
|
"""Test de dépassement de longueur en mode strict."""
|
|
long_string = "a" * 1001
|
|
result = self.validator.validate_string(long_string, max_length=1000)
|
|
|
|
assert not result.is_valid
|
|
assert "exceeds maximum length" in result.errors[0]
|
|
|
|
def test_validate_string_max_length_lenient(self):
|
|
"""Test de dépassement de longueur en mode lenient."""
|
|
long_string = "a" * 1001
|
|
result = self.lenient_validator.validate_string(long_string, max_length=1000)
|
|
|
|
assert result.is_valid
|
|
assert len(result.sanitized_value) == 1000
|
|
assert "truncated" in result.warnings[0]
|
|
|
|
def test_sanitize_for_logging_basic(self):
|
|
"""Test de sanitisation basique."""
|
|
result = self.validator.sanitize_for_logging("test data", "field")
|
|
assert "test data" in result
|
|
|
|
def test_sanitize_for_logging_large_data(self):
|
|
"""Test de sanitisation de données volumineuses."""
|
|
large_data = "x" * 300
|
|
result = self.validator.sanitize_for_logging(large_data, "large_field")
|
|
|
|
assert len(result) <= 203 # 200 + "..."
|
|
assert result.endswith("...")
|
|
|
|
def test_sanitize_for_logging_html_escape(self):
|
|
"""Test d'échappement HTML dans les logs."""
|
|
malicious_data = '<script>alert("xss")</script>'
|
|
result = self.validator.sanitize_for_logging(malicious_data, "html_field")
|
|
|
|
assert "<script>" in result
|
|
assert "<script>" not in result
|
|
|
|
|
|
class TestInputValidationFunctions:
|
|
"""Tests pour les fonctions utilitaires de validation."""
|
|
|
|
def test_validate_string_input_success(self):
|
|
"""Test de validation string réussie."""
|
|
result = validate_string_input("hello world", field_name="test")
|
|
assert result == "hello world"
|
|
|
|
def test_validate_string_input_failure(self):
|
|
"""Test de validation string échouée."""
|
|
with pytest.raises(InputValidationError) as exc_info:
|
|
validate_string_input("'; DROP TABLE users; --", field_name="malicious")
|
|
|
|
assert "Validation failed for malicious" in str(exc_info.value)
|
|
|
|
|
|
class TestValidationResult:
|
|
"""Tests pour la classe ValidationResult."""
|
|
|
|
def test_validation_result_init(self):
|
|
"""Test d'initialisation de ValidationResult."""
|
|
result = ValidationResult(
|
|
is_valid=True,
|
|
sanitized_value="test",
|
|
errors=["error1"],
|
|
warnings=["warning1"]
|
|
)
|
|
|
|
assert result.is_valid
|
|
assert result.sanitized_value == "test"
|
|
assert result.errors == ["error1"]
|
|
assert result.warnings == ["warning1"]
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestInputValidationIntegration:
|
|
"""Tests d'intégration pour la validation des entrées."""
|
|
|
|
def test_end_to_end_validation_pipeline(self):
|
|
"""Test de pipeline de validation de bout en bout."""
|
|
validator = SimpleInputValidator(strict_mode=True)
|
|
|
|
# Test avec données valides
|
|
valid_data = [
|
|
"hello world",
|
|
"user@example.com",
|
|
"normal data 123"
|
|
]
|
|
|
|
for data in valid_data:
|
|
result = validator.validate_string(data)
|
|
assert result.is_valid, f"Should accept valid data: {data}"
|
|
|
|
# Test avec données malicieuses
|
|
malicious_data = [
|
|
"'; DROP TABLE users; --",
|
|
'{"$where": "this.password"}',
|
|
"UNION SELECT * FROM passwords"
|
|
]
|
|
|
|
for data in malicious_data:
|
|
result = validator.validate_string(data)
|
|
assert not result.is_valid, f"Should reject malicious data: {data}"
|
|
|
|
def test_strict_vs_lenient_modes(self):
|
|
"""Test des modes strict vs lenient."""
|
|
strict_validator = SimpleInputValidator(strict_mode=True)
|
|
lenient_validator = SimpleInputValidator(strict_mode=False)
|
|
|
|
# Test avec données trop longues
|
|
long_data = "a" * 1500
|
|
|
|
strict_result = strict_validator.validate_string(long_data, max_length=1000)
|
|
lenient_result = lenient_validator.validate_string(long_data, max_length=1000)
|
|
|
|
assert not strict_result.is_valid
|
|
assert lenient_result.is_valid
|
|
assert len(lenient_result.sanitized_value) == 1000 |