"""Tests de sécurité : évaluateur de conditions AST restreint.""" from __future__ import annotations import pytest from core.execution.safe_condition_evaluator import ( SafeConditionEvaluator, UnsafeExpressionError, safe_eval_condition, ) # --------------------------------------------------------------------------- # Cas valides — expressions que les workflows doivent pouvoir évaluer # --------------------------------------------------------------------------- class TestValidExpressions: def test_literal_true(self): assert safe_eval_condition("True", {}) is True def test_literal_false(self): assert safe_eval_condition("False", {}) is False def test_numeric_comparison(self): assert safe_eval_condition("1 < 2", {}) is True assert safe_eval_condition("2 < 1", {}) is False def test_chained_comparison(self): assert safe_eval_condition("1 < 2 < 3", {}) is True assert safe_eval_condition("1 < 3 < 2", {}) is False def test_variable_access(self): assert safe_eval_condition("x > 5", {"x": 10}) is True def test_subscript_dict(self): ctx = {"results": {"step_1": {"score": 0.9}}} assert safe_eval_condition( "results['step_1']['score'] >= 0.8", ctx ) is True def test_boolean_and(self): assert safe_eval_condition("True and False", {}) is False assert safe_eval_condition("True and True", {}) is True def test_boolean_or(self): assert safe_eval_condition("False or True", {}) is True def test_not_operator(self): assert safe_eval_condition("not False", {}) is True def test_arithmetic(self): assert safe_eval_condition("(a + b) * 2 > 10", {"a": 3, "b": 4}) is True def test_in_operator(self): assert safe_eval_condition("'ok' in status", {"status": ["ok", "done"]}) is True def test_list_literal(self): assert safe_eval_condition("x in [1, 2, 3]", {"x": 2}) is True # --------------------------------------------------------------------------- # Cas malveillants — tentatives d'injection / RCE # --------------------------------------------------------------------------- class TestMaliciousExpressions: """Toutes ces expressions DOIVENT lever UnsafeExpressionError.""" def test_rejects_import(self): with pytest.raises(UnsafeExpressionError): safe_eval_condition("__import__('os').system('echo pwn')", {}) def test_rejects_function_call(self): with pytest.raises(UnsafeExpressionError): safe_eval_condition("print('hello')", {"print": print}) def test_rejects_eval(self): with pytest.raises(UnsafeExpressionError): safe_eval_condition("eval('1+1')", {}) def test_rejects_exec(self): with pytest.raises(UnsafeExpressionError): safe_eval_condition("exec('x=1')", {}) def test_rejects_dunder_attribute(self): # Classique : remonter à __builtins__ via __class__.__mro__ with pytest.raises(UnsafeExpressionError): safe_eval_condition("x.__class__", {"x": "abc"}) def test_rejects_dunder_subclasses(self): with pytest.raises(UnsafeExpressionError): safe_eval_condition( "x.__class__.__mro__[-1].__subclasses__()", {"x": []}, ) def test_rejects_undefined_variable(self): with pytest.raises(UnsafeExpressionError): safe_eval_condition("secret > 0", {}) def test_rejects_lambda(self): with pytest.raises(UnsafeExpressionError): safe_eval_condition("(lambda: 42)()", {}) def test_rejects_list_comprehension(self): with pytest.raises(UnsafeExpressionError): safe_eval_condition("[x for x in range(3)]", {}) def test_rejects_generator(self): with pytest.raises(UnsafeExpressionError): safe_eval_condition("(x for x in [1])", {}) def test_rejects_walrus(self): with pytest.raises(UnsafeExpressionError): safe_eval_condition("(x := 1)", {}) def test_rejects_ifexp(self): # IfExp (conditional) non autorisé par défaut — si besoin ajouter plus tard. with pytest.raises(UnsafeExpressionError): safe_eval_condition("1 if True else 2", {}) def test_rejects_starred(self): with pytest.raises(UnsafeExpressionError): safe_eval_condition("[*x]", {"x": [1, 2]}) def test_rejects_attribute_call_chain(self): # Même si 'dict' est fourni dans le contexte, on n'autorise pas les # appels de méthode. with pytest.raises(UnsafeExpressionError): safe_eval_condition( "results.keys()", {"results": {"a": 1}} ) def test_rejects_huge_expression(self): big = "0+" * 1000 + "0" with pytest.raises(UnsafeExpressionError): safe_eval_condition(big, {}) def test_rejects_syntax_error(self): with pytest.raises(UnsafeExpressionError): safe_eval_condition("1 + ", {}) def test_rejects_non_string(self): with pytest.raises(UnsafeExpressionError): safe_eval_condition(12345, {}) # type: ignore[arg-type] # --------------------------------------------------------------------------- # Intégration avec DAGExecutor : le step condition doit refuser l'injection # --------------------------------------------------------------------------- class TestDAGExecutorIntegration: def test_condition_step_refuses_malicious_payload(self): """Un workflow injectant __import__ dans 'condition' doit être refusé silencieusement (result = False) sans exécuter le code.""" from core.execution.dag_executor import DAGExecutor, WorkflowStep, StepType executor = DAGExecutor() step = WorkflowStep( step_id="malicious", step_type=StepType.CONDITION, action={"condition": "__import__('os').system('echo PWNED')"}, ) # Accès direct à la méthode privée pour isoler le comportement. result = executor._execute_condition_step(step, step.action) assert result is False def test_condition_step_accepts_safe_expression(self): from core.execution.dag_executor import DAGExecutor, WorkflowStep, StepType executor = DAGExecutor() executor._results["step_prev"] = {"ok": True} step = WorkflowStep( step_id="cond", step_type=StepType.CONDITION, action={"condition": "results['step_prev']['ok']"}, ) result = executor._execute_condition_step(step, step.action) assert result is True