From 89f8a019180ea8d54ba9ad81d91709aac3a4d5f5 Mon Sep 17 00:00:00 2001 From: Vichym Date: Fri, 22 Aug 2025 13:26:04 -0700 Subject: [PATCH 1/5] add resource property validation rule and logic --- samtranslator/model/__init__.py | 210 ++++++++++++-- samtranslator/model/sam_resources.py | 18 +- tests/test_model.py | 409 ++++++++++++++++++++++++++- 3 files changed, 609 insertions(+), 28 deletions(-) diff --git a/samtranslator/model/__init__.py b/samtranslator/model/__init__.py index e47131793..6b25838de 100644 --- a/samtranslator/model/__init__.py +++ b/samtranslator/model/__init__.py @@ -1,9 +1,10 @@ -""" CloudFormation Resource serialization, deserialization, and validation """ +"""CloudFormation Resource serialization, deserialization, and validation""" import inspect import re from abc import ABC, ABCMeta, abstractmethod from contextlib import suppress +from enum import Enum from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar from samtranslator.compat import pydantic @@ -163,7 +164,7 @@ def __init__( self.set_resource_attribute(attr, value) @classmethod - def get_supported_resource_attributes(cls): # type: ignore[no-untyped-def] + def get_supported_resource_attributes(cls) -> Tuple[str, ...]: """ A getter method for the supported resource attributes returns: a tuple that contains the name of all supported resource attributes @@ -205,7 +206,7 @@ def from_dict(cls, logical_id: str, resource_dict: Dict[str, Any], relative_id: resource = cls(logical_id, relative_id=relative_id) - resource._validate_resource_dict(logical_id, resource_dict) # type: ignore[no-untyped-call] + resource._validate_resource_dict(logical_id, resource_dict) # Default to empty properties dictionary. If customers skip the Properties section, an empty dictionary # accurately captures the intent. @@ -247,7 +248,7 @@ def _validate_logical_id(logical_id: Optional[Any]) -> str: raise InvalidResourceException(str(logical_id), "Logical ids must be alphanumeric.") @classmethod - def _validate_resource_dict(cls, logical_id, resource_dict): # type: ignore[no-untyped-def] + def _validate_resource_dict(cls, logical_id: str, resource_dict: Dict[str, Any]) -> None: """Validates that the provided resource dict contains the correct Type string, and the required Properties dict. :param dict resource_dict: the resource dict to validate @@ -335,22 +336,82 @@ def __setattr__(self, name, value): # type: ignore[no-untyped-def] ) # Note: For compabitliy issue, we should ONLY use this with new abstraction/resources. - def validate_properties_and_return_model(self, cls: Type[RT]) -> RT: + def validate_properties_and_return_model(self, cls: Type[RT], collect_all_errors: bool = False) -> RT: """ Given a resource properties, return a typed object from the definitions of SAM schema model - param: - resource_properties: properties from input template + Args: cls: schema models + collect_all_errors: If True, collect all validation errors. If False (default), only first error. """ try: return cls.parse_obj(self._generate_resource_dict()["Properties"]) except pydantic.error_wrappers.ValidationError as e: + if collect_all_errors: + # Comprehensive error collection with union type consolidation + error_messages = self._format_all_errors(e.errors()) # type: ignore[arg-type] + raise InvalidResourceException(self.logical_id, " ".join(error_messages)) from e error_properties: str = "" with suppress(KeyError): error_properties = ".".join(str(x) for x in e.errors()[0]["loc"]) raise InvalidResourceException(self.logical_id, f"Property '{error_properties}' is invalid.") from e + def _format_all_errors(self, errors: List[Dict[str, Any]]) -> List[str]: + """Format all validation errors, consolidating union type errors in single pass.""" + type_mapping = { + "not a valid dict": "dictionary", + "not a valid int": "integer", + "not a valid float": "number", + "not a valid list": "list", + "not a valid str": "string", + } + + # Group errors by path in a single pass + path_to_errors: Dict[str, Dict[str, Any]] = {} + + for error in errors: + property_path = ".".join(str(x) for x in error["loc"]) + raw_message = error.get("msg", "") + + # Extract type for union consolidation + extracted_type = None + for pattern, type_name in type_mapping.items(): + if pattern in raw_message: + extracted_type = type_name + break + + if property_path not in path_to_errors: + path_to_errors[property_path] = {"types": [], "error": error} + + if extracted_type: + path_to_errors[property_path]["types"].append(extracted_type) + + # Format messages based on collected data + result = [] + for path, data in path_to_errors.items(): + unique_types = list(dict.fromkeys(data["types"])) # Remove duplicates, preserve order + + if len(unique_types) > 1: + # Multiple types - consolidate with union + type_text = " or ".join(unique_types) + result.append(f"Property '{path}' value must be {type_text}.") + else: + # Single or no types - format normally + result.append(self._format_single_error(data["error"])) + + return result + + def _format_single_error(self, error: Dict[str, Any]) -> str: + """Format a single Pydantic error into user-friendly message.""" + property_path = ".".join(str(x) for x in error["loc"]) + raw_message = error["msg"] + + if error["type"] == "value_error.missing": + return f"Property '{property_path}' is required." + if "extra fields not permitted" in raw_message: + return f"Property '{property_path}' is an invalid property." + return f"Property '{property_path}' {raw_message.lower()}." + def validate_properties(self) -> None: """Validates that the required properties for this Resource have been populated, and that all properties have valid values. @@ -481,6 +542,16 @@ def to_cloudformation(self, **kwargs: Any) -> List[Any]: """ +class ValidationRule(Enum): + MUTUALLY_EXCLUSIVE = "mutually_exclusive" + MUTUALLY_INCLUSIVE = "mutually_inclusive" + CONDITIONAL_REQUIREMENT = "conditional_requirement" + + +# Simple tuple-based rules: (rule_type, [property_names]) +PropertyRule = Tuple[ValidationRule, List[str]] + + class SamResourceMacro(ResourceMacro, metaclass=ABCMeta): """ResourceMacro that specifically refers to SAM (AWS::Serverless::*) resources.""" @@ -536,14 +607,14 @@ def get_resource_references(self, generated_cfn_resources, supported_resource_re def _construct_tag_list( self, tags: Optional[Dict[str, Any]], additional_tags: Optional[Dict[str, Any]] = None ) -> List[Dict[str, Any]]: - if not bool(tags): - tags = {} + tags_dict: Dict[str, Any] = tags or {} if additional_tags is None: additional_tags = {} + # At this point tags is guaranteed to be a Dict[str, Any] since we set it to {} if it was falsy for tag in self._RESERVED_TAGS: - self._check_tag(tag, tags) # type: ignore[no-untyped-call] + self._check_tag(tag, tags_dict) sam_tag = {self._SAM_KEY: self._SAM_VALUE} @@ -553,6 +624,40 @@ def _construct_tag_list( # customer's knowledge. return get_tag_list(sam_tag) + get_tag_list(additional_tags) + get_tag_list(tags) + @staticmethod + def propagate_tags_combine( + resources: List[Resource], tags: Optional[Dict[str, Any]], propagate_tags: Optional[bool] = False + ) -> None: + """ + Propagates tags to the resources + Similar to propagate_tags() method but this method will combine provided tags with existing resource tags. + + Note: + - This method created after propagate_tags() to combine propagate tags with resource tags create during to_cloudformation() + - Create this method because updating propagate_tags() will cause regression issue; + - Use this method for new resource if you want to assign combined tags, not replace. + + :param propagate_tags: Whether we should pass the tags to generated resources. + :param resources: List of generated resources + :param tags: dictionary of tags to propagate to the resources. + + :return: None + """ + if not propagate_tags or not tags: + return + + for resource in resources: + if hasattr(resource, "Tags"): + if resource.Tags: + propagated_tags = get_tag_list(tags) + combined_tags = [ + {"Key": k, "Value": v} + for k, v in {tag["Key"]: tag["Value"] for tag in resource.Tags + propagated_tags}.items() + ] + resource.Tags = combined_tags + else: + resource.assign_tags(tags) + @staticmethod def propagate_tags( resources: List[Resource], tags: Optional[Dict[str, Any]], propagate_tags: Optional[bool] = False @@ -572,7 +677,7 @@ def propagate_tags( for resource in resources: resource.assign_tags(tags) - def _check_tag(self, reserved_tag_name, tags): # type: ignore[no-untyped-def] + def _check_tag(self, reserved_tag_name: str, tags: Dict[str, Any]) -> None: if reserved_tag_name in tags: raise InvalidResourceException( self.logical_id, @@ -582,6 +687,71 @@ def _check_tag(self, reserved_tag_name, tags): # type: ignore[no-untyped-def] "input.", ) + def validate_before_transform(self, schema_class: Optional[Type[RT]], collect_all_errors: bool = False) -> None: + if not hasattr(self, "__validation_rules__"): + return + + rules = self.__validation_rules__ + validated_model = ( + self.validate_properties_and_return_model(schema_class, collect_all_errors) if schema_class else None + ) + + error_messages = [] + + for rule_type, properties in rules: + present = [prop for prop in properties if self._get_property_value(prop, validated_model) is not None] + if rule_type == ValidationRule.MUTUALLY_EXCLUSIVE: + # Check if more than one property exists + if len(present) > 1: + prop_names = [f"'{p}'" for p in present] + error_messages.append(f"Cannot specify {self._combine_string(prop_names)} together.") + + elif rule_type == ValidationRule.MUTUALLY_INCLUSIVE: + # If any property in the group is present, then all properties in the group must be present + # Check if some but not all properties from the group are present + missing_some_properties = 0 < len(present) < len(properties) + if missing_some_properties: + error_messages.append(f"Properties must be used together: {self._combine_string(properties)}.") + + elif ( + rule_type == ValidationRule.CONDITIONAL_REQUIREMENT + and self._get_property_value(properties[0], validated_model) is not None + and self._get_property_value(properties[1], validated_model) is None + ): + # First property requires second property + error_messages.append(f"'{properties[0]}' requires '{properties[1]}'.") + + # If there are any validation errors, raise a single exception with all error messages + if error_messages: + raise InvalidResourceException(self.logical_id, "\n".join(error_messages)) + + def _combine_string(self, words: List[str]) -> str: + return ", ".join(words[:-1]) + (" and " + words[-1] if len(words) > 1 else words[0] if words else "") + + def _get_property_value(self, prop: str, validated_model: Any = None) -> Any: + """Original property value getter. Supports nested properties with dot notation.""" + if "." not in prop: + # Simple property - use existing logic for direct attributes + return getattr(self, prop, None) + + # Nested property - use validated model + if validated_model is None: + return None + + try: + # Navigate through nested properties using dot notation + value = validated_model + for part in prop.split("."): + if hasattr(value, part): + value = getattr(value, part) + if value is None: + return None + else: + return None + return value + except Exception: + return None + class ResourceTypeResolver: """ResourceTypeResolver maps Resource Types to Resource classes, e.g. AWS::Serverless::Function to @@ -643,7 +813,7 @@ def get_all_resources(self) -> Dict[str, Any]: """Return a dictionary of all resources from the SAM template.""" return self.resources - def get_resource_by_logical_id(self, _input: str) -> Dict[str, Any]: + def get_resource_by_logical_id(self, _input: str) -> Optional[Dict[str, Any]]: """ Recursively find resource with matching Logical ID that are present in the template and returns the value. If it is not in template, this method simply returns the input unchanged. @@ -661,16 +831,16 @@ def get_resource_by_logical_id(self, _input: str) -> Dict[str, Any]: __all__: List[str] = [ "IS_DICT", "IS_STR", - "Validator", - "any_type", - "is_type", - "PropertyType", - "Property", - "PassThroughProperty", "MutatedPassThroughProperty", + "PassThroughProperty", + "Property", + "PropertyType", "Resource", "ResourceMacro", - "SamResourceMacro", - "ResourceTypeResolver", "ResourceResolver", + "ResourceTypeResolver", + "SamResourceMacro", + "Validator", + "any_type", + "is_type", ] diff --git a/samtranslator/model/sam_resources.py b/samtranslator/model/sam_resources.py index 2cdbc3c8b..35c324d6b 100644 --- a/samtranslator/model/sam_resources.py +++ b/samtranslator/model/sam_resources.py @@ -1,4 +1,4 @@ -""" SAM macro definitions """ +"""SAM macro definitions""" import copy import re @@ -139,6 +139,7 @@ class SamFunction(SamResourceMacro): """SAM function macro.""" resource_type = "AWS::Serverless::Function" + property_types = { "FunctionName": PropertyType(False, one_of(IS_STR, IS_DICT)), "Handler": PassThroughProperty(False), @@ -252,6 +253,15 @@ class SamFunction(SamResourceMacro): "DestinationQueue": SQSQueue.resource_type, } + # Validation rules + __validation_rules__ = [ + # TODO: To enable these rules, we need to update translator test input/output files to property configure template + # to avoid fail-fast. eg: test with DeploymentPreference without AutoPublishAlias would fail fast before reaching testing state + # (ValidationRule.MUTUALLY_EXCLUSIVE, ["ImageUri", "InlineCode", "CodeUri"]), + # (ValidationRule.CONDITIONAL_REQUIREMENT, ["DeploymentPreference", "AutoPublishAlias"]), + # (ValidationRule.CONDITIONAL_REQUIREMENT, ["ProvisionedConcurrencyConfig", "AutoPublishAlias"]), + ] + def resources_to_link(self, resources: Dict[str, Any]) -> Dict[str, Any]: try: return {"event_resources": self._event_resources_to_link(resources)} @@ -274,6 +284,11 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P conditions = kwargs.get("conditions", {}) feature_toggle = kwargs.get("feature_toggle") + # TODO: Skip pass schema_class=aws_serverless_function.Properties to skip schema validation for now. + # - adding this now would required update error message in error error_function_*_test.py + # - add this when we can verify that changing error message would not break customers + # self.validate_before_transform(schema_class=None, collect_all_errors=False) + if self.DeadLetterQueue: self._validate_dlq(self.DeadLetterQueue) @@ -1809,6 +1824,7 @@ def _validate_architectures(self, lambda_layer: LambdaLayerVersion) -> None: # Intrinsics are not validated if is_intrinsic(architectures): return + for arq in architectures: # We validate the values only if we they're not intrinsics if not is_intrinsic(arq) and arq not in [ARM64, X86_64]: diff --git a/tests/test_model.py b/tests/test_model.py index 4e003172b..9b78fed9f 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -1,11 +1,13 @@ -from typing import Any, List +from typing import Any, List, Optional from unittest import TestCase from unittest.mock import Mock import pytest +from samtranslator.internal.schema_source.common import BaseModel from samtranslator.intrinsics.resource_refs import SupportedResourceReferences -from samtranslator.model import PropertyType, Resource, ResourceTypeResolver, SamResourceMacro +from samtranslator.model import PropertyType, Resource, ResourceTypeResolver, SamResourceMacro, ValidationRule from samtranslator.model.exceptions import InvalidResourceException +from samtranslator.model.types import IS_BOOL, IS_DICT, IS_INT, IS_STR from samtranslator.plugins import LifeCycleEvents @@ -171,10 +173,13 @@ def test_resource_must_override_runtime_attributes(self): class NewResource(Resource): resource_type = "foo" property_types = {} - runtime_attrs = {"attr1": Mock(), "attr2": Mock()} - runtime_attrs["attr1"].return_value = "value1" - runtime_attrs["attr2"].return_value = "value2" + mock_attr1 = Mock() + mock_attr2 = Mock() + mock_attr1.return_value = "value1" + mock_attr2.return_value = "value2" + + runtime_attrs = {"attr1": mock_attr1, "attr2": mock_attr2} logical_id = "SomeId" resource = NewResource(logical_id) @@ -336,12 +341,12 @@ class TestResourceTypeResolver(TestCase): def test_can_resolve_must_handle_null_resource_dict(self): resolver = ResourceTypeResolver() - self.assertFalse(resolver.can_resolve(None)) + self.assertFalse(resolver.can_resolve(None)) # type: ignore[arg-type] def test_can_resolve_must_handle_non_dict(self): resolver = ResourceTypeResolver() - self.assertFalse(resolver.can_resolve("some string value")) + self.assertFalse(resolver.can_resolve("some string value")) # type: ignore[arg-type] def test_can_resolve_must_handle_dict_without_type(self): resolver = ResourceTypeResolver() @@ -391,3 +396,393 @@ class MyResource(Resource): mock_sam_plugins.act.assert_called_once_with( LifeCycleEvents.before_transform_resource, "logicalId", resource_type, expected_properties ) + + +class TestSamResourceMacro(TestCase): + """ + Test validation functionality in SamResourceMacro + + IMPORTANT: These tests document the CURRENT BEHAVIOR of the validation system, + which has known bugs. The validation logic incorrectly treats property conditions + that evaluate to False as "present" for validation purposes. + + Known Issues: + 1. Property conditions like "Type=CUSTOM" that evaluate to False are still + considered "present" because False != None + 2. Boolean comparisons fail because True != "True" (string comparison) + 3. Integer comparisons fail because 42 != "42" (string comparison) + 4. Validation rules trigger even when their conditions don't match + """ + + def setUp(self): + """Set up common test fixtures""" + self.ValidationRule = ValidationRule + + # Test setting properties using BaseModel from samtranslator.internal.schema_source.common + class TestSettingProperties(BaseModel): + NestedVar1: Optional[str] = None + NestedVar2: Optional[str] = None + + # Comprehensive schema class for testing using BaseModel from samtranslator.internal.schema_source.common + class TestProperties(BaseModel): + ConditionalVar1: Optional[int] = None + ConditionalVar2: Optional[int] = None + ConditionalVar3: Optional[int] = None + ExclusiveVar1: Optional[str] = None + ExclusiveVar2: Optional[str] = None + ExclusiveVar3: Optional[str] = None + InclusiveVar1: Optional[bool] = None + InclusiveVar2: Optional[bool] = None + InclusiveVar3: Optional[bool] = None + NestedSetting1: Optional[TestSettingProperties] = None + NestedSetting2: Optional[TestSettingProperties] = None + NestedSetting3: Optional[TestSettingProperties] = None + + self.TestProperties = TestProperties + self.TestSettingProperties = TestSettingProperties + + # Reusable test resource class extending SamResourceMacro + class TestResource(SamResourceMacro): + resource_type = "Test::Resource" + property_types = { + "ConditionalVar1": PropertyType(False, IS_INT), + "ConditionalVar2": PropertyType(False, IS_INT), + "ConditionalVar3": PropertyType(False, IS_INT), + "ExclusiveVar1": PropertyType(False, IS_STR), + "ExclusiveVar2": PropertyType(False, IS_STR), + "ExclusiveVar3": PropertyType(False, IS_STR), + "InclusiveVar1": PropertyType(False, IS_BOOL), + "InclusiveVar2": PropertyType(False, IS_BOOL), + "InclusiveVar3": PropertyType(False, IS_BOOL), + "NestedSetting1": PropertyType(False, IS_DICT), + "NestedSetting2": PropertyType(False, IS_DICT), + "NestedSetting3": PropertyType(False, IS_DICT), + } + + def to_cloudformation(self, **kwargs): + return [] + + self.TestResource = TestResource + + class TestValidateBeforeTransform(TestCase): + """Test cases for validate_before_transform() method""" + + def setUp(self): + """Set up test fixtures for validate_before_transform tests""" + # Import from parent class + parent = TestSamResourceMacro() + parent.setUp() + self.ValidationRule = parent.ValidationRule + self.TestProperties = parent.TestProperties + self.TestSettingProperties = parent.TestSettingProperties + self.TestResource = parent.TestResource + + def test_validate_before_transform_all_rules_pass(self): + """Test successful validation when all rules are satisfied""" + + class Resource(self.TestResource): + __validation_rules__ = [ + (self.ValidationRule.CONDITIONAL_REQUIREMENT, ["ConditionalVar1", "ConditionalVar2"]), + (self.ValidationRule.CONDITIONAL_REQUIREMENT, ["ConditionalVar2", "ConditionalVar3"]), + ( + self.ValidationRule.CONDITIONAL_REQUIREMENT, + ["NestedSetting1.NestedVar1", "NestedSetting2.NestedVar2"], + ), + (self.ValidationRule.MUTUALLY_EXCLUSIVE, ["ExclusiveVar1", "ExclusiveVar2"]), + (self.ValidationRule.MUTUALLY_EXCLUSIVE, ["ExclusiveVar2", "ExclusiveVar3"]), + ( + self.ValidationRule.MUTUALLY_EXCLUSIVE, + ["NestedSetting1.NestedVar1", "NestedSetting2.NestedVar1"], + ), + (self.ValidationRule.MUTUALLY_EXCLUSIVE, ["NestedSetting1.NestedVar2", "ExclusiveVar3"]), + (self.ValidationRule.MUTUALLY_INCLUSIVE, ["InclusiveVar1", "ExclusiveVar1"]), + ( + self.ValidationRule.MUTUALLY_INCLUSIVE, + ["NestedSetting1.NestedVar1", "NestedSetting2.NestedVar2"], + ), + ( + self.ValidationRule.MUTUALLY_INCLUSIVE, + ["NestedSetting2.NestedVar2", "NestedSetting3.NestedVar2"], + ), + ] + + resource = Resource("TestId") + + # Happy case: All rules satisfied + resource.ConditionalVar1 = 1 + resource.ConditionalVar2 = 2 + resource.ConditionalVar3 = 3 + resource.ExclusiveVar1 = "ONLY_EXCLUSIVE_VAL" # cannot have ExclusiveVar2, ExclusiveVar3 + resource.InclusiveVar1 = True + resource.InclusiveVar2 = False + resource.NestedSetting2 = {"NestedVar2": "NestedVar2"} + resource.NestedSetting1 = {"NestedVar1": "NestedVar1"} + resource.NestedSetting3 = {"NestedVar2": "NestedVar2"} + + # Should not raise any exception + resource.validate_before_transform(self.TestProperties, collect_all_errors=True) + + def test_conditional_requirement_rule(self): + """Test CONDITIONAL_REQUIREMENT validation rule""" + + class Resource(self.TestResource): + __validation_rules__ = [ + (self.ValidationRule.CONDITIONAL_REQUIREMENT, ["ConditionalVar1", "ConditionalVar2"]), + (self.ValidationRule.CONDITIONAL_REQUIREMENT, ["ConditionalVar1", "ConditionalVar3"]), + ( + self.ValidationRule.CONDITIONAL_REQUIREMENT, + ["NestedSetting1.NestedVar1", "NestedSetting2.NestedVar2"], + ), + ] + + resource = Resource("TestId") + + # Test 1: Should show 3 errors + resource.ConditionalVar1 = 1 + resource.NestedSetting1 = {"NestedVar1": "NestedVar1"} + with self.assertRaises(InvalidResourceException) as error_1: + resource.validate_before_transform(self.TestProperties, collect_all_errors=True) + self.assertEqual( + "Resource with id [TestId] is invalid. 'ConditionalVar1' requires 'ConditionalVar2'.\n'ConditionalVar1' requires 'ConditionalVar3'.\n'NestedSetting1.NestedVar1' requires 'NestedSetting2.NestedVar2'.", + error_1.exception.message, + ) + + # Test 2: should show 2 error + resource.ConditionalVar2 = 2 + with self.assertRaises(InvalidResourceException) as error_2: + resource.validate_before_transform(self.TestProperties, collect_all_errors=True) + self.assertEqual( + "Resource with id [TestId] is invalid. 'ConditionalVar1' requires 'ConditionalVar3'.\n'NestedSetting1.NestedVar1' requires 'NestedSetting2.NestedVar2'.", + error_2.exception.message, + ) + + # Test 3: should show 1 error + resource.ConditionalVar3 = 3 + with self.assertRaises(InvalidResourceException) as error_3: + resource.validate_before_transform(self.TestProperties, collect_all_errors=True) + self.assertEqual( + "Resource with id [TestId] is invalid. 'NestedSetting1.NestedVar1' requires 'NestedSetting2.NestedVar2'.", + error_3.exception.message, + ) + + def test_mutually_inclusive_rule(self): + """Test MUTUALLY_INCLUSIVE validation rule""" + + class Resource(self.TestResource): + __validation_rules__ = [ + # When InclusiveVar1 is specified, InclusiveVar2 and InclusiveVar3 should be present + (self.ValidationRule.MUTUALLY_INCLUSIVE, ["InclusiveVar1", "InclusiveVar2"]), + (self.ValidationRule.MUTUALLY_INCLUSIVE, ["InclusiveVar1", "InclusiveVar3"]), + # When NestedSetting1.NestedVar1 is specified, NestedSetting2.NestedVar2 should be present + ( + self.ValidationRule.MUTUALLY_INCLUSIVE, + ["NestedSetting1.NestedVar1", "NestedSetting2.NestedVar2"], + ), + ] + + resource = Resource("TestId") + + # Test 1: When InclusiveVar1 is present, both InclusiveVar2 and InclusiveVar3 must also be present + resource.InclusiveVar1 = True + with self.assertRaises(InvalidResourceException) as error_1: + resource.validate_before_transform(self.TestProperties, collect_all_errors=True) + expected_errors = [ + "Properties must be used together: InclusiveVar1 and InclusiveVar2.", + "Properties must be used together: InclusiveVar1 and InclusiveVar3.", + ] + for expected_error in expected_errors: + self.assertIn(expected_error, error_1.exception.message) + + # Test 2: When InclusiveVar2 is provided, only one error should remain + resource.InclusiveVar2 = True + with self.assertRaises(InvalidResourceException) as error_2: + resource.validate_before_transform(self.TestProperties, collect_all_errors=True) + self.assertIn( + "Properties must be used together: InclusiveVar1 and InclusiveVar3.", error_2.exception.message + ) + + # Test 3: When all inclusive vars are provided, no error for inclusive vars + resource.InclusiveVar3 = True + # Should not raise exception for inclusive vars + resource.validate_before_transform(self.TestProperties) + + # Test 4: When NestedSetting1.NestedVar1 is specified, NestedSetting2.NestedVar2 should be present + resource.NestedSetting1 = {"NestedVar1": "AUTO"} + with self.assertRaises(InvalidResourceException) as error_4: + resource.validate_before_transform(self.TestProperties) + self.assertEqual( + "Resource with id [TestId] is invalid. Properties must be used together: NestedSetting1.NestedVar1 and NestedSetting2.NestedVar2.", + error_4.exception.message, + ) + + # Test 5: When both nested properties are provided, no error + resource.NestedSetting2 = {"NestedVar2": "REQUIRED"} + # Should not raise exception + resource.validate_before_transform(self.TestProperties) + + def test_mutually_exclusive_rule(self): + """Test MUTUALLY_EXCLUSIVE validation rule""" + + class Resource(self.TestResource): + __validation_rules__ = [ + (self.ValidationRule.MUTUALLY_EXCLUSIVE, ["ExclusiveVar1", "ExclusiveVar2"]), + (self.ValidationRule.MUTUALLY_EXCLUSIVE, ["ExclusiveVar2", "ExclusiveVar3"]), + ( + self.ValidationRule.MUTUALLY_EXCLUSIVE, + ["NestedSetting1.NestedVar1", "NestedSetting2.NestedVar1"], + ), + (self.ValidationRule.MUTUALLY_EXCLUSIVE, ["NestedSetting1.NestedVar2", "ExclusiveVar3"]), + ] + + resource = Resource("TestId") + + # Test 1: Cannot have both ExclusiveVar1 and ExclusiveVar2 + resource.ExclusiveVar1 = "value1" + resource.ExclusiveVar2 = "value2" + with self.assertRaises(InvalidResourceException) as error_1: + resource.validate_before_transform(self.TestProperties) + self.assertEqual( + "Resource with id [TestId] is invalid. Cannot specify 'ExclusiveVar1' and 'ExclusiveVar2' together.", + error_1.exception.message, + ) + + # Test 2: Can have ExclusiveVar1 alone + resource.ExclusiveVar2 = None # Remove ExclusiveVar2 + # Should not raise exception + resource.validate_before_transform(self.TestProperties) + + # Test 3: Cannot have both ExclusiveVar2 and ExclusiveVar3 + resource.ExclusiveVar1 = None # Remove ExclusiveVar1 + resource.ExclusiveVar2 = "value2" + resource.ExclusiveVar3 = "value3" + with self.assertRaises(InvalidResourceException) as error_3: + resource.validate_before_transform(self.TestProperties) + self.assertEqual( + "Resource with id [TestId] is invalid. Cannot specify 'ExclusiveVar2' and 'ExclusiveVar3' together.", + error_3.exception.message, + ) + + # Test 4: Multiple exclusive violations - should show all errors + resource.ExclusiveVar1 = "value1" # This conflicts with ExclusiveVar2 + resource.ExclusiveVar2 = "value2" # This conflicts with both ExclusiveVar1 and ExclusiveVar3 + resource.ExclusiveVar3 = "value3" # This conflicts with ExclusiveVar2 + with self.assertRaises(InvalidResourceException) as error_4: + resource.validate_before_transform(self.TestProperties, collect_all_errors=True) + expected_errors = [ + "Cannot specify 'ExclusiveVar1' and 'ExclusiveVar2' together.", + "Cannot specify 'ExclusiveVar2' and 'ExclusiveVar3' together.", + ] + for expected_error in expected_errors: + self.assertIn(expected_error, error_4.exception.message) + + # Test 5: Cannot have both NestedSetting1.NestedVar1 and NestedSetting2.NestedVar1 + resource.ExclusiveVar1 = None # Clear previous conflicts + resource.ExclusiveVar2 = None + resource.ExclusiveVar3 = None + resource.NestedSetting1 = {"NestedVar1": "nested_value1"} + resource.NestedSetting2 = {"NestedVar1": "nested_value2"} + with self.assertRaises(InvalidResourceException) as error_5: + resource.validate_before_transform(self.TestProperties) + self.assertEqual( + "Resource with id [TestId] is invalid. Cannot specify 'NestedSetting1.NestedVar1' and 'NestedSetting2.NestedVar1' together.", + error_5.exception.message, + ) + + # Test 6: Cannot have both NestedSetting1.NestedVar2 and ExclusiveVar3 + resource.NestedSetting2 = None # Remove previous conflict + resource.NestedSetting1 = {"NestedVar2": "nested_value2"} + resource.ExclusiveVar3 = "value3" + with self.assertRaises(InvalidResourceException) as error_6: + resource.validate_before_transform(self.TestProperties) + self.assertEqual( + "Resource with id [TestId] is invalid. Cannot specify 'NestedSetting1.NestedVar2' and 'ExclusiveVar3' together.", + error_6.exception.message, + ) + + # Test 7: Can have nested properties that don't conflict + resource.ExclusiveVar3 = None # Remove conflict + resource.NestedSetting1 = {"NestedVar1": "nested_value1"} # This doesn't conflict with NestedVar2 + resource.NestedSetting2 = {"NestedVar2": "nested_value2"} # This doesn't conflict with NestedVar1 + # Should not raise exception + resource.validate_before_transform(self.TestProperties) + + class TestFormatAllErrors(TestCase): + """Test cases for _format_all_errors() method""" + + def setUp(self): + """Set up test fixtures for _format_all_errors tests""" + # Import from parent class + parent = TestSamResourceMacro() + parent.setUp() + self.ValidationRule = parent.ValidationRule + self.TestProperties = parent.TestProperties + self.TestSettingProperties = parent.TestSettingProperties + self.TestResource = parent.TestResource + + @pytest.mark.parametrize( + "mock_errors,expected_count,expected_messages", + [ + # Single type error + ( + [{"loc": ("ConditionalVar1",), "msg": "not a valid int", "type": "type_error.integer"}], + 1, + ["Property 'ConditionalVar1' value must be integer."], + ), + # Union type consolidation + ( + [ + {"loc": ("ExclusiveVar1",), "msg": "not a valid str", "type": "type_error.str"}, + {"loc": ("ExclusiveVar1",), "msg": "not a valid int", "type": "type_error.integer"}, + ], + 1, + ["Property 'ExclusiveVar1' value must be string or integer."], + ), + # Missing property error + ( + [{"loc": ("RequiredProperty",), "msg": "field required", "type": "value_error.missing"}], + 1, + ["Property 'RequiredProperty' is required."], + ), + # Invalid property error + ( + [{"loc": ("InvalidProperty",), "msg": "extra fields not permitted", "type": "value_error.extra"}], + 1, + ["Property 'InvalidProperty' is an invalid property."], + ), + # Nested property error + ( + [{"loc": ("NestedSetting1", "NestedVar1"), "msg": "not a valid str", "type": "type_error.str"}], + 1, + ["Property 'NestedSetting1.NestedVar1' value must be string."], + ), + # Multiple properties with errors + ( + [ + {"loc": ("ConditionalVar1",), "msg": "not a valid int", "type": "type_error.integer"}, + {"loc": ("ExclusiveVar1",), "msg": "field required", "type": "value_error.missing"}, + {"loc": ("NestedSetting1", "NestedVar1"), "msg": "not a valid str", "type": "type_error.str"}, + ], + 3, + [ + "Property 'ConditionalVar1' value must be integer.", + "Property 'ExclusiveVar1' is required.", + "Property 'NestedSetting1.NestedVar1' value must be string.", + ], + ), + # Fallback error formatting + ( + [{"loc": ("SomeProperty",), "msg": "Some Custom Error Message", "type": "custom_error"}], + 1, + ["Property 'SomeProperty' some custom error message."], + ), + ], + ) + def test_format_all_errors(self, mock_errors, expected_count, expected_messages): + """Test formatting various types of validation errors""" + resource = self.TestResource("TestId") + + formatted_errors = resource._format_all_errors(mock_errors) + self.assertEqual(len(formatted_errors), expected_count) + + for expected_message in expected_messages: + self.assertIn(expected_message, formatted_errors) From d5410b0dc8c87657103f69ecd951622639859082 Mon Sep 17 00:00:00 2001 From: Vichym Date: Fri, 22 Aug 2025 14:24:56 -0700 Subject: [PATCH 2/5] comment out validation rules for functions --- samtranslator/model/sam_resources.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/samtranslator/model/sam_resources.py b/samtranslator/model/sam_resources.py index 35c324d6b..573e196f2 100644 --- a/samtranslator/model/sam_resources.py +++ b/samtranslator/model/sam_resources.py @@ -254,13 +254,13 @@ class SamFunction(SamResourceMacro): } # Validation rules - __validation_rules__ = [ - # TODO: To enable these rules, we need to update translator test input/output files to property configure template - # to avoid fail-fast. eg: test with DeploymentPreference without AutoPublishAlias would fail fast before reaching testing state - # (ValidationRule.MUTUALLY_EXCLUSIVE, ["ImageUri", "InlineCode", "CodeUri"]), - # (ValidationRule.CONDITIONAL_REQUIREMENT, ["DeploymentPreference", "AutoPublishAlias"]), - # (ValidationRule.CONDITIONAL_REQUIREMENT, ["ProvisionedConcurrencyConfig", "AutoPublishAlias"]), - ] + # TODO: To enable these rules, we need to update translator test input/output files to property configure template + # to avoid fail-fast. eg: test with DeploymentPreference without AutoPublishAlias would fail fast before reaching testing state + # __validation_rules__ = [ + # (ValidationRule.MUTUALLY_EXCLUSIVE, ["ImageUri", "InlineCode", "CodeUri"]), + # (ValidationRule.CONDITIONAL_REQUIREMENT, ["DeploymentPreference", "AutoPublishAlias"]), + # (ValidationRule.CONDITIONAL_REQUIREMENT, ["ProvisionedConcurrencyConfig", "AutoPublishAlias"]), + # ] def resources_to_link(self, resources: Dict[str, Any]) -> Dict[str, Any]: try: From b52c322b7cf20fbc3645f0444b05def318e7f05e Mon Sep 17 00:00:00 2001 From: Vichym Date: Fri, 22 Aug 2025 16:26:16 -0700 Subject: [PATCH 3/5] chore: ignore validator.oy from test coverage --- .coveragerc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.coveragerc b/.coveragerc index 22c75b771..3dfa48907 100644 --- a/.coveragerc +++ b/.coveragerc @@ -4,6 +4,8 @@ omit = # Schema tested by `make check-black` samtranslator/schema/* samtranslator/internal/schema_source/* + # Deprecated validator module + samtranslator/validator/validator.py [report] exclude_lines = pragma: no cover \ No newline at end of file From 7f854fb11fcf091e293ce4f03b1048fe78f4ae3f Mon Sep 17 00:00:00 2001 From: Vichym Date: Wed, 27 Aug 2025 01:11:00 -0700 Subject: [PATCH 4/5] address comments --- samtranslator/model/__init__.py | 89 +-- samtranslator/model/sam_resources.py | 20 +- samtranslator/validator/property_rule.py | 140 +++++ tests/test_model.py | 753 +++++++++++++---------- tests/validator/__init__.py | 1 + tests/validator/test_property_rule.py | 170 +++++ 6 files changed, 758 insertions(+), 415 deletions(-) create mode 100644 samtranslator/validator/property_rule.py create mode 100644 tests/validator/__init__.py create mode 100644 tests/validator/test_property_rule.py diff --git a/samtranslator/model/__init__.py b/samtranslator/model/__init__.py index 6b25838de..76867b11e 100644 --- a/samtranslator/model/__init__.py +++ b/samtranslator/model/__init__.py @@ -4,7 +4,6 @@ import re from abc import ABC, ABCMeta, abstractmethod from contextlib import suppress -from enum import Enum from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar from samtranslator.compat import pydantic @@ -16,6 +15,7 @@ from samtranslator.model.tags.resource_tagging import get_tag_list from samtranslator.model.types import IS_DICT, IS_STR, PassThrough, Validator, any_type, is_type from samtranslator.plugins import LifeCycleEvents +from samtranslator.validator.property_rule import PropertyRules RT = TypeVar("RT", bound=pydantic.BaseModel) # return type @@ -395,8 +395,11 @@ def _format_all_errors(self, errors: List[Dict[str, Any]]) -> List[str]: # Multiple types - consolidate with union type_text = " or ".join(unique_types) result.append(f"Property '{path}' value must be {type_text}.") + elif len(unique_types) == 1: + # Single type - use type mapping + result.append(f"Property '{path}' value must be {unique_types[0]}.") else: - # Single or no types - format normally + # No types - format normally result.append(self._format_single_error(data["error"])) return result @@ -542,16 +545,6 @@ def to_cloudformation(self, **kwargs: Any) -> List[Any]: """ -class ValidationRule(Enum): - MUTUALLY_EXCLUSIVE = "mutually_exclusive" - MUTUALLY_INCLUSIVE = "mutually_inclusive" - CONDITIONAL_REQUIREMENT = "conditional_requirement" - - -# Simple tuple-based rules: (rule_type, [property_names]) -PropertyRule = Tuple[ValidationRule, List[str]] - - class SamResourceMacro(ResourceMacro, metaclass=ABCMeta): """ResourceMacro that specifically refers to SAM (AWS::Serverless::*) resources.""" @@ -579,6 +572,10 @@ class SamResourceMacro(ResourceMacro, metaclass=ABCMeta): # Aggregate list of all reserved tags _RESERVED_TAGS = [_SAM_KEY, _SAR_APP_KEY, _SAR_SEMVER_KEY] + def get_property_validation_rules(self) -> Optional[PropertyRules]: + """Override this method in child classes to provide PropertyRules validation.""" + return None + def get_resource_references(self, generated_cfn_resources, supported_resource_refs): # type: ignore[no-untyped-def] """ Constructs the list of supported resource references by going through the list of CFN resources generated @@ -687,71 +684,21 @@ def _check_tag(self, reserved_tag_name: str, tags: Dict[str, Any]) -> None: "input.", ) - def validate_before_transform(self, schema_class: Optional[Type[RT]], collect_all_errors: bool = False) -> None: - if not hasattr(self, "__validation_rules__"): + def validate_before_transform(self, schema_class: Type[RT]) -> None: + rules = self.get_property_validation_rules() + if rules is None: return - rules = self.__validation_rules__ - validated_model = ( - self.validate_properties_and_return_model(schema_class, collect_all_errors) if schema_class else None - ) - - error_messages = [] - - for rule_type, properties in rules: - present = [prop for prop in properties if self._get_property_value(prop, validated_model) is not None] - if rule_type == ValidationRule.MUTUALLY_EXCLUSIVE: - # Check if more than one property exists - if len(present) > 1: - prop_names = [f"'{p}'" for p in present] - error_messages.append(f"Cannot specify {self._combine_string(prop_names)} together.") - - elif rule_type == ValidationRule.MUTUALLY_INCLUSIVE: - # If any property in the group is present, then all properties in the group must be present - # Check if some but not all properties from the group are present - missing_some_properties = 0 < len(present) < len(properties) - if missing_some_properties: - error_messages.append(f"Properties must be used together: {self._combine_string(properties)}.") - - elif ( - rule_type == ValidationRule.CONDITIONAL_REQUIREMENT - and self._get_property_value(properties[0], validated_model) is not None - and self._get_property_value(properties[1], validated_model) is None - ): - # First property requires second property - error_messages.append(f"'{properties[0]}' requires '{properties[1]}'.") + # Validate properties and get model, then pass to rules + try: + validated_model = self.validate_properties_and_return_model(schema_class) + except Exception: + validated_model = None - # If there are any validation errors, raise a single exception with all error messages + error_messages = rules.validate_all(validated_model) if error_messages: raise InvalidResourceException(self.logical_id, "\n".join(error_messages)) - def _combine_string(self, words: List[str]) -> str: - return ", ".join(words[:-1]) + (" and " + words[-1] if len(words) > 1 else words[0] if words else "") - - def _get_property_value(self, prop: str, validated_model: Any = None) -> Any: - """Original property value getter. Supports nested properties with dot notation.""" - if "." not in prop: - # Simple property - use existing logic for direct attributes - return getattr(self, prop, None) - - # Nested property - use validated model - if validated_model is None: - return None - - try: - # Navigate through nested properties using dot notation - value = validated_model - for part in prop.split("."): - if hasattr(value, part): - value = getattr(value, part) - if value is None: - return None - else: - return None - return value - except Exception: - return None - class ResourceTypeResolver: """ResourceTypeResolver maps Resource Types to Resource classes, e.g. AWS::Serverless::Function to diff --git a/samtranslator/model/sam_resources.py b/samtranslator/model/sam_resources.py index 573e196f2..95df0087f 100644 --- a/samtranslator/model/sam_resources.py +++ b/samtranslator/model/sam_resources.py @@ -253,14 +253,20 @@ class SamFunction(SamResourceMacro): "DestinationQueue": SQSQueue.resource_type, } - # Validation rules + # def get_property_validation_rules(self) -> Optional[PropertyRules]: + # """Override to provide PropertyRules validation for SAM Function.""" # TODO: To enable these rules, we need to update translator test input/output files to property configure template # to avoid fail-fast. eg: test with DeploymentPreference without AutoPublishAlias would fail fast before reaching testing state - # __validation_rules__ = [ - # (ValidationRule.MUTUALLY_EXCLUSIVE, ["ImageUri", "InlineCode", "CodeUri"]), - # (ValidationRule.CONDITIONAL_REQUIREMENT, ["DeploymentPreference", "AutoPublishAlias"]), - # (ValidationRule.CONDITIONAL_REQUIREMENT, ["ProvisionedConcurrencyConfig", "AutoPublishAlias"]), - # ] + # from samtranslator.internal.schema_source.aws_serverless_function import Properties as FunctionProperties + # return (PropertyRules(FunctionProperties) + # .addMutuallyExclusive("ImageUri", "InlineCode", "CodeUri") + # .addConditionalInclusive("DeploymentPreference", ["AutoPublishAlias"]) + # .addConditionalInclusive("ProvisionedConcurrencyConfig", ["AutoPublishAlias"]) + # .addConditionalInclusive("PackageType=Zip", ["Runtime", "Handler"]) + # .addConditionalInclusive("PackageType=Image", ["ImageUri"]) + # .addConditionalExclusive("PackageType=Zip", ["ImageUri", "ImageConfig"]) + # .addConditionalExclusive("PackageType=Image", ["Runtime", "Handler", "Layers"])) + # return None def resources_to_link(self, resources: Dict[str, Any]) -> Dict[str, Any]: try: @@ -287,7 +293,7 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P # TODO: Skip pass schema_class=aws_serverless_function.Properties to skip schema validation for now. # - adding this now would required update error message in error error_function_*_test.py # - add this when we can verify that changing error message would not break customers - # self.validate_before_transform(schema_class=None, collect_all_errors=False) + # self.validate_before_transform(schema_class=aws_serverless_function.Properties) if self.DeadLetterQueue: self._validate_dlq(self.DeadLetterQueue) diff --git a/samtranslator/validator/property_rule.py b/samtranslator/validator/property_rule.py new file mode 100644 index 000000000..23ab5f6df --- /dev/null +++ b/samtranslator/validator/property_rule.py @@ -0,0 +1,140 @@ +from enum import Enum +from typing import Any, List, Optional, Tuple, TypeVar + +RT = TypeVar("RT") + + +class RuleType(Enum): + MUTUALLY_EXCLUSIVE = "mutually_exclusive" + CONDITIONAL_EXCLUSIVE = "conditional_exclusive" + MUTUALLY_INCLUSIVE = "mutually_inclusive" + CONDITIONAL_INCLUSIVE = "conditional_inclusive" + + +class PropertyRules: + def __init__(self) -> None: + self._rules: List[Tuple[RuleType, List[str], List[str]]] = [] + + def addMutuallyExclusive(self, *props: str) -> "PropertyRules": + self._rules.append((RuleType.MUTUALLY_EXCLUSIVE, list(props), [])) + return self + + def addConditionalExclusive(self, source_prop: str, target_props: List[str]) -> "PropertyRules": + self._rules.append((RuleType.CONDITIONAL_EXCLUSIVE, [source_prop], target_props)) + return self + + def addMutuallyInclusive(self, *props: str) -> "PropertyRules": + self._rules.append((RuleType.MUTUALLY_INCLUSIVE, list(props), [])) + return self + + def addConditionalInclusive(self, source_prop: str, target_props: List[str]) -> "PropertyRules": + self._rules.append((RuleType.CONDITIONAL_INCLUSIVE, [source_prop], target_props)) + return self + + def validate_all(self, validated_model: Optional[RT]) -> List[str]: + if validated_model is None: + return [] + + errors = [] + for rule_type, source_props, target_props in self._rules: + # Validate property names during validation + all_props = source_props + target_props + for prop in all_props: + # Skip validation for value-based conditions (PropertyName=Value) + if "=" in prop: + prop_name = prop.split("=", 1)[0] + if "." not in prop_name and not hasattr(validated_model, prop_name): + errors.append(f"Property '{prop_name}' does not exist in the schema") + continue + elif "." not in prop and not hasattr(validated_model, prop): + errors.append(f"Property '{prop}' does not exist in the schema") + continue + + error = self._validate_rule(validated_model, rule_type, source_props, target_props) + if error: + errors.append(error) + return errors + + def _validate_rule( + self, validated_model: RT, rule_type: RuleType, source_props: List[str], target_props: List[str] + ) -> Optional[str]: + if rule_type == RuleType.MUTUALLY_EXCLUSIVE: + return self._validate_mutually_exclusive(validated_model, source_props) + if rule_type == RuleType.CONDITIONAL_EXCLUSIVE: + return self._validate_conditional_exclusive(validated_model, source_props, target_props) + if rule_type == RuleType.MUTUALLY_INCLUSIVE: + return self._validate_mutually_inclusive(validated_model, source_props) + if rule_type == RuleType.CONDITIONAL_INCLUSIVE: + return self._validate_conditional_inclusive(validated_model, source_props, target_props) + return None + + def _validate_mutually_exclusive(self, validated_model: RT, source_props: List[str]) -> Optional[str]: + present = [prop for prop in source_props if self._get_property_value(validated_model, prop) is not None] + if len(present) > 1: + present_props = " and ".join(f"'{p}'" for p in present) + return f"Cannot specify {present_props} together." + return None + + def _validate_conditional_exclusive( + self, validated_model: RT, source_props: List[str], target_props: List[str] + ) -> Optional[str]: + source_present = any(self._check_property_condition(validated_model, prop) for prop in source_props) + if source_present: + conflicting = [prop for prop in target_props if self._check_property_condition(validated_model, prop)] + if conflicting: + conflicting_props = ", ".join(f"'{p}'" for p in conflicting) + return f"'{source_props[0]}' cannot be used with {conflicting_props}." + return None + + def _validate_mutually_inclusive(self, validated_model: RT, source_props: List[str]) -> Optional[str]: + present = [prop for prop in source_props if self._get_property_value(validated_model, prop) is not None] + if 0 < len(present) < len(source_props): + missing = [prop for prop in source_props if prop not in present] + missing_props = ", ".join(f"'{p}'" for p in missing) + present_props = ", ".join(f"'{p}'" for p in present) + return f"When using {present_props}, you must also specify {missing_props}." + return None + + def _validate_conditional_inclusive( + self, validated_model: RT, source_props: List[str], target_props: List[str] + ) -> Optional[str]: + source_present = any(self._check_property_condition(validated_model, prop) for prop in source_props) + if source_present: + missing = [prop for prop in target_props if not self._check_property_condition(validated_model, prop)] + if missing: + missing_props = ", ".join(f"'{p}'" for p in missing) + return f"'{source_props[0]}' requires all of: {missing_props}." + return None + + def _get_property_value(self, validated_model: RT, prop: str) -> Any: + if "." not in prop: + return getattr(validated_model, prop, None) + + # Handle nested properties recursively + try: + first_part, rest = prop.split(".", 1) + + # Get the first level value + if hasattr(validated_model, first_part): + value = getattr(validated_model, first_part) + elif isinstance(validated_model, dict) and first_part in validated_model: + value = validated_model[first_part] + else: + return None + + # If value is None, return None + if value is None: + return None + + # Recursively get the rest of the property path + return self._get_property_value(value, rest) + except Exception: + return None + + def _check_property_condition(self, validated_model: RT, prop: str) -> bool: + """Check if property condition is met. Supports 'PropertyName=Value' syntax.""" + if "=" in prop: + prop_name, expected_value = prop.split("=", 1) + actual_value = self._get_property_value(validated_model, prop_name) + return actual_value is not None and str(actual_value) == expected_value + return self._get_property_value(validated_model, prop) is not None diff --git a/tests/test_model.py b/tests/test_model.py index 9b78fed9f..a506e42cf 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -3,12 +3,14 @@ from unittest.mock import Mock import pytest +from parameterized import parameterized from samtranslator.internal.schema_source.common import BaseModel from samtranslator.intrinsics.resource_refs import SupportedResourceReferences -from samtranslator.model import PropertyType, Resource, ResourceTypeResolver, SamResourceMacro, ValidationRule +from samtranslator.model import PropertyType, Resource, ResourceTypeResolver, SamResourceMacro from samtranslator.model.exceptions import InvalidResourceException from samtranslator.model.types import IS_BOOL, IS_DICT, IS_INT, IS_STR from samtranslator.plugins import LifeCycleEvents +from samtranslator.validator.property_rule import PropertyRules def valid_if_true(value, should_raise=True): @@ -398,32 +400,18 @@ class MyResource(Resource): ) -class TestSamResourceMacro(TestCase): - """ - Test validation functionality in SamResourceMacro +# Restructured test classes for better organization +class SamResourceMacroTestBase: + """Shared test fixtures for SamResourceMacro tests""" - IMPORTANT: These tests document the CURRENT BEHAVIOR of the validation system, - which has known bugs. The validation logic incorrectly treats property conditions - that evaluate to False as "present" for validation purposes. - - Known Issues: - 1. Property conditions like "Type=CUSTOM" that evaluate to False are still - considered "present" because False != None - 2. Boolean comparisons fail because True != "True" (string comparison) - 3. Integer comparisons fail because 42 != "42" (string comparison) - 4. Validation rules trigger even when their conditions don't match - """ - - def setUp(self): + @classmethod + def setup_test_fixtures(cls): """Set up common test fixtures""" - self.ValidationRule = ValidationRule - # Test setting properties using BaseModel from samtranslator.internal.schema_source.common class TestSettingProperties(BaseModel): NestedVar1: Optional[str] = None NestedVar2: Optional[str] = None - # Comprehensive schema class for testing using BaseModel from samtranslator.internal.schema_source.common class TestProperties(BaseModel): ConditionalVar1: Optional[int] = None ConditionalVar2: Optional[int] = None @@ -438,10 +426,6 @@ class TestProperties(BaseModel): NestedSetting2: Optional[TestSettingProperties] = None NestedSetting3: Optional[TestSettingProperties] = None - self.TestProperties = TestProperties - self.TestSettingProperties = TestSettingProperties - - # Reusable test resource class extending SamResourceMacro class TestResource(SamResourceMacro): resource_type = "Test::Resource" property_types = { @@ -462,327 +446,422 @@ class TestResource(SamResourceMacro): def to_cloudformation(self, **kwargs): return [] - self.TestResource = TestResource - - class TestValidateBeforeTransform(TestCase): - """Test cases for validate_before_transform() method""" - - def setUp(self): - """Set up test fixtures for validate_before_transform tests""" - # Import from parent class - parent = TestSamResourceMacro() - parent.setUp() - self.ValidationRule = parent.ValidationRule - self.TestProperties = parent.TestProperties - self.TestSettingProperties = parent.TestSettingProperties - self.TestResource = parent.TestResource - - def test_validate_before_transform_all_rules_pass(self): - """Test successful validation when all rules are satisfied""" - - class Resource(self.TestResource): - __validation_rules__ = [ - (self.ValidationRule.CONDITIONAL_REQUIREMENT, ["ConditionalVar1", "ConditionalVar2"]), - (self.ValidationRule.CONDITIONAL_REQUIREMENT, ["ConditionalVar2", "ConditionalVar3"]), - ( - self.ValidationRule.CONDITIONAL_REQUIREMENT, - ["NestedSetting1.NestedVar1", "NestedSetting2.NestedVar2"], - ), - (self.ValidationRule.MUTUALLY_EXCLUSIVE, ["ExclusiveVar1", "ExclusiveVar2"]), - (self.ValidationRule.MUTUALLY_EXCLUSIVE, ["ExclusiveVar2", "ExclusiveVar3"]), - ( - self.ValidationRule.MUTUALLY_EXCLUSIVE, - ["NestedSetting1.NestedVar1", "NestedSetting2.NestedVar1"], - ), - (self.ValidationRule.MUTUALLY_EXCLUSIVE, ["NestedSetting1.NestedVar2", "ExclusiveVar3"]), - (self.ValidationRule.MUTUALLY_INCLUSIVE, ["InclusiveVar1", "ExclusiveVar1"]), - ( - self.ValidationRule.MUTUALLY_INCLUSIVE, - ["NestedSetting1.NestedVar1", "NestedSetting2.NestedVar2"], - ), - ( - self.ValidationRule.MUTUALLY_INCLUSIVE, - ["NestedSetting2.NestedVar2", "NestedSetting3.NestedVar2"], - ), - ] - - resource = Resource("TestId") - - # Happy case: All rules satisfied - resource.ConditionalVar1 = 1 - resource.ConditionalVar2 = 2 - resource.ConditionalVar3 = 3 - resource.ExclusiveVar1 = "ONLY_EXCLUSIVE_VAL" # cannot have ExclusiveVar2, ExclusiveVar3 - resource.InclusiveVar1 = True - resource.InclusiveVar2 = False - resource.NestedSetting2 = {"NestedVar2": "NestedVar2"} - resource.NestedSetting1 = {"NestedVar1": "NestedVar1"} - resource.NestedSetting3 = {"NestedVar2": "NestedVar2"} - - # Should not raise any exception - resource.validate_before_transform(self.TestProperties, collect_all_errors=True) - - def test_conditional_requirement_rule(self): - """Test CONDITIONAL_REQUIREMENT validation rule""" - - class Resource(self.TestResource): - __validation_rules__ = [ - (self.ValidationRule.CONDITIONAL_REQUIREMENT, ["ConditionalVar1", "ConditionalVar2"]), - (self.ValidationRule.CONDITIONAL_REQUIREMENT, ["ConditionalVar1", "ConditionalVar3"]), - ( - self.ValidationRule.CONDITIONAL_REQUIREMENT, - ["NestedSetting1.NestedVar1", "NestedSetting2.NestedVar2"], - ), - ] - - resource = Resource("TestId") - - # Test 1: Should show 3 errors - resource.ConditionalVar1 = 1 - resource.NestedSetting1 = {"NestedVar1": "NestedVar1"} - with self.assertRaises(InvalidResourceException) as error_1: - resource.validate_before_transform(self.TestProperties, collect_all_errors=True) - self.assertEqual( - "Resource with id [TestId] is invalid. 'ConditionalVar1' requires 'ConditionalVar2'.\n'ConditionalVar1' requires 'ConditionalVar3'.\n'NestedSetting1.NestedVar1' requires 'NestedSetting2.NestedVar2'.", - error_1.exception.message, - ) - - # Test 2: should show 2 error - resource.ConditionalVar2 = 2 - with self.assertRaises(InvalidResourceException) as error_2: - resource.validate_before_transform(self.TestProperties, collect_all_errors=True) - self.assertEqual( - "Resource with id [TestId] is invalid. 'ConditionalVar1' requires 'ConditionalVar3'.\n'NestedSetting1.NestedVar1' requires 'NestedSetting2.NestedVar2'.", - error_2.exception.message, - ) - - # Test 3: should show 1 error - resource.ConditionalVar3 = 3 - with self.assertRaises(InvalidResourceException) as error_3: - resource.validate_before_transform(self.TestProperties, collect_all_errors=True) - self.assertEqual( - "Resource with id [TestId] is invalid. 'NestedSetting1.NestedVar1' requires 'NestedSetting2.NestedVar2'.", - error_3.exception.message, - ) - - def test_mutually_inclusive_rule(self): - """Test MUTUALLY_INCLUSIVE validation rule""" - - class Resource(self.TestResource): - __validation_rules__ = [ - # When InclusiveVar1 is specified, InclusiveVar2 and InclusiveVar3 should be present - (self.ValidationRule.MUTUALLY_INCLUSIVE, ["InclusiveVar1", "InclusiveVar2"]), - (self.ValidationRule.MUTUALLY_INCLUSIVE, ["InclusiveVar1", "InclusiveVar3"]), - # When NestedSetting1.NestedVar1 is specified, NestedSetting2.NestedVar2 should be present - ( - self.ValidationRule.MUTUALLY_INCLUSIVE, - ["NestedSetting1.NestedVar1", "NestedSetting2.NestedVar2"], - ), - ] - - resource = Resource("TestId") - - # Test 1: When InclusiveVar1 is present, both InclusiveVar2 and InclusiveVar3 must also be present - resource.InclusiveVar1 = True - with self.assertRaises(InvalidResourceException) as error_1: - resource.validate_before_transform(self.TestProperties, collect_all_errors=True) - expected_errors = [ - "Properties must be used together: InclusiveVar1 and InclusiveVar2.", - "Properties must be used together: InclusiveVar1 and InclusiveVar3.", - ] - for expected_error in expected_errors: - self.assertIn(expected_error, error_1.exception.message) - - # Test 2: When InclusiveVar2 is provided, only one error should remain - resource.InclusiveVar2 = True - with self.assertRaises(InvalidResourceException) as error_2: - resource.validate_before_transform(self.TestProperties, collect_all_errors=True) - self.assertIn( - "Properties must be used together: InclusiveVar1 and InclusiveVar3.", error_2.exception.message - ) - - # Test 3: When all inclusive vars are provided, no error for inclusive vars - resource.InclusiveVar3 = True - # Should not raise exception for inclusive vars + return TestProperties, TestSettingProperties, TestResource + + +class TestSamResourceMacroValidation(TestCase): + """Test validate_before_transform functionality with restructured approach""" + + def setUp(self): + self.TestProperties, self.TestSettingProperties, self.TestResource = ( + SamResourceMacroTestBase.setup_test_fixtures() + ) + + def test_validate_before_transform_simple_rules_pass(self): + """Test successful validation with simple rules""" + + class Resource(self.TestResource): + def get_property_validation_rules(self): + return ( + PropertyRules() + .addConditionalInclusive("ConditionalVar1", ["ConditionalVar2"]) + .addMutuallyExclusive("ExclusiveVar1", "ExclusiveVar2") + .addMutuallyInclusive("InclusiveVar1", "InclusiveVar2") + ) + + resource = Resource("TestId") + resource.ConditionalVar1 = 1 + resource.ConditionalVar2 = 2 + resource.ExclusiveVar1 = "value1" + resource.InclusiveVar1 = True + resource.InclusiveVar2 = True + + # Should not raise exception + resource.validate_before_transform(self.TestProperties) + + def test_validate_before_transform_complex_rules_pass(self): + """Test successful validation when all rules are satisfied""" + + class Resource(self.TestResource): + def get_property_validation_rules(self): + return ( + PropertyRules() + .addConditionalInclusive("ConditionalVar1", ["ConditionalVar2"]) + .addConditionalInclusive("ConditionalVar2", ["ConditionalVar3"]) + .addConditionalInclusive("NestedSetting1.NestedVar1", ["NestedSetting2.NestedVar2"]) + .addMutuallyExclusive("ExclusiveVar1", "ExclusiveVar2") + .addMutuallyExclusive("ExclusiveVar2", "ExclusiveVar3") + .addMutuallyExclusive("NestedSetting1.NestedVar1", "NestedSetting2.NestedVar1") + .addMutuallyExclusive("NestedSetting1.NestedVar2", "ExclusiveVar3") + .addMutuallyInclusive("InclusiveVar1", "ExclusiveVar1") + .addMutuallyInclusive("NestedSetting1.NestedVar1", "NestedSetting2.NestedVar2") + .addMutuallyInclusive("NestedSetting2.NestedVar2", "NestedSetting3.NestedVar2") + ) + + resource = Resource("TestId") + + # Happy case: All rules satisfied + resource.ConditionalVar1 = 1 + resource.ConditionalVar2 = 2 + resource.ConditionalVar3 = 3 + resource.ExclusiveVar1 = "ONLY_EXCLUSIVE_VAL" + resource.InclusiveVar1 = True + resource.InclusiveVar2 = False + resource.NestedSetting2 = {"NestedVar2": "NestedVar2"} + resource.NestedSetting1 = {"NestedVar1": "NestedVar1"} + resource.NestedSetting3 = {"NestedVar2": "NestedVar2"} + + # Should not raise any exception + resource.validate_before_transform(self.TestProperties) + + def test_validate_before_transform_conditional_inclusive(self): + """Test CONDITIONAL_INCLUSIVE validation rule""" + + class Resource(self.TestResource): + def get_property_validation_rules(self): + return ( + PropertyRules() + .addConditionalInclusive("ConditionalVar1=1", ["ConditionalVar2", "ConditionalVar3"]) + .addConditionalInclusive( + "NestedSetting1.NestedVar1=test", ["NestedSetting2.NestedVar2=22", "NestedSetting2.NestedVar1"] + ) + ) + + resource = Resource("TestId") + + # Test 1: When ConditionalVar1=1, ConditionalVar2 and ConditionalVar3 should be present + resource.ConditionalVar1 = 1 + resource.NestedSetting1 = {"NestedVar1": "test"} + with self.assertRaises(InvalidResourceException) as error_1: resource.validate_before_transform(self.TestProperties) + self.assertEqual( + "Resource with id [TestId] is invalid. 'ConditionalVar1=1' requires all of: 'ConditionalVar2', 'ConditionalVar3'.\n'NestedSetting1.NestedVar1=test' requires all of: 'NestedSetting2.NestedVar2=22', 'NestedSetting2.NestedVar1'.", + error_1.exception.message, + ) - # Test 4: When NestedSetting1.NestedVar1 is specified, NestedSetting2.NestedVar2 should be present - resource.NestedSetting1 = {"NestedVar1": "AUTO"} - with self.assertRaises(InvalidResourceException) as error_4: - resource.validate_before_transform(self.TestProperties) - self.assertEqual( - "Resource with id [TestId] is invalid. Properties must be used together: NestedSetting1.NestedVar1 and NestedSetting2.NestedVar2.", - error_4.exception.message, - ) - - # Test 5: When both nested properties are provided, no error - resource.NestedSetting2 = {"NestedVar2": "REQUIRED"} - # Should not raise exception + # Test 2: Add ConditionalVar2, should still fail for ConditionalVar3 + resource.ConditionalVar2 = 2 + with self.assertRaises(InvalidResourceException) as error_2: resource.validate_before_transform(self.TestProperties) + self.assertEqual( + "Resource with id [TestId] is invalid. 'ConditionalVar1=1' requires all of: 'ConditionalVar3'.\n'NestedSetting1.NestedVar1=test' requires all of: 'NestedSetting2.NestedVar2=22', 'NestedSetting2.NestedVar1'.", + error_2.exception.message, + ) + + # Test 3: Add ConditionalVar3, should still fail for NestedSetting2.NestedVar2 + resource.ConditionalVar3 = 3 + with self.assertRaises(InvalidResourceException) as error_3: + resource.validate_before_transform(self.TestProperties) + self.assertEqual( + "Resource with id [TestId] is invalid. 'NestedSetting1.NestedVar1=test' requires all of: 'NestedSetting2.NestedVar2=22', 'NestedSetting2.NestedVar1'.", + error_3.exception.message, + ) - def test_mutually_exclusive_rule(self): - """Test MUTUALLY_EXCLUSIVE validation rule""" - - class Resource(self.TestResource): - __validation_rules__ = [ - (self.ValidationRule.MUTUALLY_EXCLUSIVE, ["ExclusiveVar1", "ExclusiveVar2"]), - (self.ValidationRule.MUTUALLY_EXCLUSIVE, ["ExclusiveVar2", "ExclusiveVar3"]), - ( - self.ValidationRule.MUTUALLY_EXCLUSIVE, - ["NestedSetting1.NestedVar1", "NestedSetting2.NestedVar1"], - ), - (self.ValidationRule.MUTUALLY_EXCLUSIVE, ["NestedSetting1.NestedVar2", "ExclusiveVar3"]), - ] - - resource = Resource("TestId") - - # Test 1: Cannot have both ExclusiveVar1 and ExclusiveVar2 - resource.ExclusiveVar1 = "value1" - resource.ExclusiveVar2 = "value2" - with self.assertRaises(InvalidResourceException) as error_1: - resource.validate_before_transform(self.TestProperties) - self.assertEqual( - "Resource with id [TestId] is invalid. Cannot specify 'ExclusiveVar1' and 'ExclusiveVar2' together.", - error_1.exception.message, - ) - - # Test 2: Can have ExclusiveVar1 alone - resource.ExclusiveVar2 = None # Remove ExclusiveVar2 - # Should not raise exception + # Test 4: Add NestedSetting2.NestedVar2=22, should still fail for NestedSetting2.NestedVar1=12 + resource.NestedSetting2 = {"NestedVar2": 22} + with self.assertRaises(InvalidResourceException) as error_4: resource.validate_before_transform(self.TestProperties) + self.assertEqual( + "Resource with id [TestId] is invalid. 'NestedSetting1.NestedVar1=test' requires all of: 'NestedSetting2.NestedVar1'.", + error_4.exception.message, + ) - # Test 3: Cannot have both ExclusiveVar2 and ExclusiveVar3 - resource.ExclusiveVar1 = None # Remove ExclusiveVar1 - resource.ExclusiveVar2 = "value2" - resource.ExclusiveVar3 = "value3" - with self.assertRaises(InvalidResourceException) as error_3: - resource.validate_before_transform(self.TestProperties) - self.assertEqual( - "Resource with id [TestId] is invalid. Cannot specify 'ExclusiveVar2' and 'ExclusiveVar3' together.", - error_3.exception.message, - ) - - # Test 4: Multiple exclusive violations - should show all errors - resource.ExclusiveVar1 = "value1" # This conflicts with ExclusiveVar2 - resource.ExclusiveVar2 = "value2" # This conflicts with both ExclusiveVar1 and ExclusiveVar3 - resource.ExclusiveVar3 = "value3" # This conflicts with ExclusiveVar2 - with self.assertRaises(InvalidResourceException) as error_4: - resource.validate_before_transform(self.TestProperties, collect_all_errors=True) - expected_errors = [ - "Cannot specify 'ExclusiveVar1' and 'ExclusiveVar2' together.", - "Cannot specify 'ExclusiveVar2' and 'ExclusiveVar3' together.", - ] - for expected_error in expected_errors: - self.assertIn(expected_error, error_4.exception.message) - - # Test 5: Cannot have both NestedSetting1.NestedVar1 and NestedSetting2.NestedVar1 - resource.ExclusiveVar1 = None # Clear previous conflicts - resource.ExclusiveVar2 = None - resource.ExclusiveVar3 = None - resource.NestedSetting1 = {"NestedVar1": "nested_value1"} - resource.NestedSetting2 = {"NestedVar1": "nested_value2"} - with self.assertRaises(InvalidResourceException) as error_5: - resource.validate_before_transform(self.TestProperties) - self.assertEqual( - "Resource with id [TestId] is invalid. Cannot specify 'NestedSetting1.NestedVar1' and 'NestedSetting2.NestedVar1' together.", - error_5.exception.message, - ) - - # Test 6: Cannot have both NestedSetting1.NestedVar2 and ExclusiveVar3 - resource.NestedSetting2 = None # Remove previous conflict - resource.NestedSetting1 = {"NestedVar2": "nested_value2"} - resource.ExclusiveVar3 = "value3" - with self.assertRaises(InvalidResourceException) as error_6: - resource.validate_before_transform(self.TestProperties) - self.assertEqual( - "Resource with id [TestId] is invalid. Cannot specify 'NestedSetting1.NestedVar2' and 'ExclusiveVar3' together.", - error_6.exception.message, - ) - - # Test 7: Can have nested properties that don't conflict - resource.ExclusiveVar3 = None # Remove conflict - resource.NestedSetting1 = {"NestedVar1": "nested_value1"} # This doesn't conflict with NestedVar2 - resource.NestedSetting2 = {"NestedVar2": "nested_value2"} # This doesn't conflict with NestedVar1 - # Should not raise exception + # Test 5: Set NestedSetting2.NestedVar1, should pass + resource.NestedSetting2 = {"NestedVar2": 22, "NestedVar1": 12} + resource.validate_before_transform(self.TestProperties) + + # Test 6: When ConditionalVar1 is None/unset, conditional rules should not apply + resource.ConditionalVar1 = None + resource.ConditionalVar2 = None + resource.ConditionalVar3 = None + resource.NestedSetting1 = None + resource.NestedSetting2 = None + resource.validate_before_transform(self.TestProperties) # Should pass + + # Test 7: When ConditionalVar1 != 1, rules should not apply (value-specific behavior) + resource.ConditionalVar1 = 2 # Different value than 1 + resource.ConditionalVar2 = None # Missing target properties + resource.ConditionalVar3 = None + resource.validate_before_transform(self.TestProperties) # Should pass + + # Test 8: When NestedSetting1.NestedVar1 != "test", nested rule should not apply + resource.ConditionalVar1 = 1 + resource.ConditionalVar2 = 2 + resource.ConditionalVar3 = 3 + resource.NestedSetting1 = {"NestedVar1": "different"} # Different value than "test" + resource.NestedSetting2 = None # Missing target property + resource.validate_before_transform(self.TestProperties) # Should pass + + # Test 9: All conditions are met - should pass + resource.ConditionalVar1 = 1 + resource.ConditionalVar2 = 2 + resource.ConditionalVar3 = 3 + resource.NestedSetting1 = {"NestedVar1": "test"} + resource.NestedSetting2 = {"NestedVar2": 22, "NestedVar1": 12} + resource.validate_before_transform(self.TestProperties) # Should pass + + def test_validate_before_transform_conditional_exclusive(self): + """Test CONDITIONAL_EXCLUSIVE validation rule""" + + class Resource(self.TestResource): + def get_property_validation_rules(self): + return ( + PropertyRules() + .addConditionalExclusive("ConditionalVar1=1", ["ConditionalVar2", "ConditionalVar3"]) + .addConditionalExclusive( + "NestedSetting1.NestedVar1=test", ["NestedSetting2.NestedVar2=22", "NestedSetting2.NestedVar1"] + ) + ) + + resource = Resource("TestId") + + # Test 1: When ConditionalVar1=1, ConditionalVar2 and ConditionalVar3 should NOT be present + resource.ConditionalVar1 = 1 + resource.ConditionalVar2 = 2 + resource.ConditionalVar3 = 3 + with self.assertRaises(InvalidResourceException) as error_1: resource.validate_before_transform(self.TestProperties) + self.assertIn( + "'ConditionalVar1=1' cannot be used with 'ConditionalVar2', 'ConditionalVar3'", error_1.exception.message + ) + + # Test 2: Remove conflicting properties, should pass + resource.ConditionalVar2 = None + resource.ConditionalVar3 = None + resource.validate_before_transform(self.TestProperties) - class TestFormatAllErrors(TestCase): - """Test cases for _format_all_errors() method""" - - def setUp(self): - """Set up test fixtures for _format_all_errors tests""" - # Import from parent class - parent = TestSamResourceMacro() - parent.setUp() - self.ValidationRule = parent.ValidationRule - self.TestProperties = parent.TestProperties - self.TestSettingProperties = parent.TestSettingProperties - self.TestResource = parent.TestResource - - @pytest.mark.parametrize( - "mock_errors,expected_count,expected_messages", - [ - # Single type error - ( - [{"loc": ("ConditionalVar1",), "msg": "not a valid int", "type": "type_error.integer"}], - 1, - ["Property 'ConditionalVar1' value must be integer."], - ), - # Union type consolidation - ( - [ - {"loc": ("ExclusiveVar1",), "msg": "not a valid str", "type": "type_error.str"}, - {"loc": ("ExclusiveVar1",), "msg": "not a valid int", "type": "type_error.integer"}, - ], - 1, - ["Property 'ExclusiveVar1' value must be string or integer."], - ), - # Missing property error - ( - [{"loc": ("RequiredProperty",), "msg": "field required", "type": "value_error.missing"}], - 1, - ["Property 'RequiredProperty' is required."], - ), - # Invalid property error - ( - [{"loc": ("InvalidProperty",), "msg": "extra fields not permitted", "type": "value_error.extra"}], - 1, - ["Property 'InvalidProperty' is an invalid property."], - ), - # Nested property error - ( - [{"loc": ("NestedSetting1", "NestedVar1"), "msg": "not a valid str", "type": "type_error.str"}], - 1, - ["Property 'NestedSetting1.NestedVar1' value must be string."], - ), - # Multiple properties with errors - ( - [ - {"loc": ("ConditionalVar1",), "msg": "not a valid int", "type": "type_error.integer"}, - {"loc": ("ExclusiveVar1",), "msg": "field required", "type": "value_error.missing"}, - {"loc": ("NestedSetting1", "NestedVar1"), "msg": "not a valid str", "type": "type_error.str"}, - ], - 3, - [ - "Property 'ConditionalVar1' value must be integer.", - "Property 'ExclusiveVar1' is required.", - "Property 'NestedSetting1.NestedVar1' value must be string.", - ], - ), - # Fallback error formatting - ( - [{"loc": ("SomeProperty",), "msg": "Some Custom Error Message", "type": "custom_error"}], - 1, - ["Property 'SomeProperty' some custom error message."], - ), - ], + # Test 3: When NestedSetting1.NestedVar1="test", NestedSetting2.NestedVar2 should NOT be present + resource.NestedSetting1 = {"NestedVar1": "test"} + resource.NestedSetting2 = {"NestedVar2": "value2"} + with self.assertRaises(InvalidResourceException) as error_3: + resource.validate_before_transform(self.TestProperties) + self.assertIn( + "'NestedSetting1.NestedVar1=test' cannot be used with 'NestedSetting2.NestedVar2'", + error_3.exception.message, ) - def test_format_all_errors(self, mock_errors, expected_count, expected_messages): - """Test formatting various types of validation errors""" - resource = self.TestResource("TestId") - formatted_errors = resource._format_all_errors(mock_errors) - self.assertEqual(len(formatted_errors), expected_count) + # Test 4: Remove NestedSetting2.NestedVar2, should pass + resource.NestedSetting2 = None + resource.validate_before_transform(self.TestProperties) + + # Test 5: When ConditionalVar1 is None/unset, conditional rules should not apply + resource.ConditionalVar1 = None + resource.ConditionalVar2 = 2 + resource.ConditionalVar3 = 3 + resource.NestedSetting1 = None + resource.NestedSetting2 = {"NestedVar2": "value2"} + resource.validate_before_transform(self.TestProperties) # Should pass + + # Test 6: When ConditionalVar1 != 1, rules should not apply (value-specific behavior) + resource.ConditionalVar1 = 2 # Different value than 1 + resource.ConditionalVar2 = 2 # Conflicting properties present + resource.ConditionalVar3 = 3 + resource.validate_before_transform(self.TestProperties) # Should pass + + # Test 7: When NestedSetting1.NestedVar1 != "test", nested rule should not apply + resource.ConditionalVar1 = 1 + resource.ConditionalVar2 = None + resource.ConditionalVar3 = None + resource.NestedSetting1 = {"NestedVar1": "different"} # Different value than "test" + resource.NestedSetting2 = {"NestedVar2": "value2"} # Conflicting property present + resource.validate_before_transform(self.TestProperties) # Should pass + + def test_validate_before_transform_mutually_inclusive(self): + """Test MUTUALLY_INCLUSIVE validation rule""" + + class Resource(self.TestResource): + def get_property_validation_rules(self): + return ( + PropertyRules() + .addMutuallyInclusive("InclusiveVar1", "InclusiveVar2") + .addMutuallyInclusive("InclusiveVar1", "InclusiveVar3") + .addMutuallyInclusive("NestedSetting1.NestedVar1", "NestedSetting2.NestedVar2") + ) + + resource = Resource("TestId") + + # Test 1: When InclusiveVar1 is specified, InclusiveVar2 and InclusiveVar3 should be present + resource.InclusiveVar1 = True + with self.assertRaises(InvalidResourceException) as error_1: + resource.validate_before_transform(self.TestProperties) + expected_errors = [ + "When using 'InclusiveVar1', you must also specify 'InclusiveVar2'", + "When using 'InclusiveVar1', you must also specify 'InclusiveVar3'", + ] + for expected_error in expected_errors: + self.assertIn(expected_error, error_1.exception.message) + + # Test 2: Add InclusiveVar2, should still fail for InclusiveVar3 + resource.InclusiveVar2 = True + with self.assertRaises(InvalidResourceException) as error_2: + resource.validate_before_transform(self.TestProperties) + self.assertIn("When using 'InclusiveVar1', you must also specify 'InclusiveVar3'", error_2.exception.message) + + # Test 3: Add InclusiveVar3, should pass for inclusive vars + resource.InclusiveVar3 = True + resource.validate_before_transform(self.TestProperties) + + # Test 4: When NestedSetting1.NestedVar1 is specified, NestedSetting2.NestedVar2 should be present + resource.NestedSetting1 = {"NestedVar1": "AUTO"} + with self.assertRaises(InvalidResourceException) as error_4: + resource.validate_before_transform(self.TestProperties) + self.assertIn( + "When using 'NestedSetting1.NestedVar1', you must also specify 'NestedSetting2.NestedVar2'", + error_4.exception.message, + ) + + # Test 5: Add NestedSetting2.NestedVar2, should pass + resource.NestedSetting2 = {"NestedVar2": "REQUIRED"} + resource.validate_before_transform(self.TestProperties) + + def test_validate_before_transform_mutually_exclusive(self): + """Test MUTUALLY_EXCLUSIVE validation rule""" + + class Resource(self.TestResource): + def get_property_validation_rules(self): + return ( + PropertyRules() + .addMutuallyExclusive("ExclusiveVar1", "ExclusiveVar2") + .addMutuallyExclusive("ExclusiveVar2", "ExclusiveVar3") + .addMutuallyExclusive("NestedSetting1.NestedVar1", "NestedSetting2.NestedVar1") + .addMutuallyExclusive("NestedSetting1.NestedVar2", "ExclusiveVar3") + ) + + resource = Resource("TestId") + + # Test 1: Cannot have both ExclusiveVar1 and ExclusiveVar2 + resource.ExclusiveVar1 = "value1" + resource.ExclusiveVar2 = "value2" + with self.assertRaises(InvalidResourceException) as error_1: + resource.validate_before_transform(self.TestProperties) + self.assertEqual( + "Resource with id [TestId] is invalid. Cannot specify 'ExclusiveVar1' and 'ExclusiveVar2' together.", + error_1.exception.message, + ) + + # Test 2: Remove ExclusiveVar1, should pass + resource.ExclusiveVar1 = None + resource.ExclusiveVar2 = "value2" + resource.validate_before_transform(self.TestProperties) + + # Test 3: Cannot have both ExclusiveVar2 and ExclusiveVar3 + resource.ExclusiveVar2 = "value2" + resource.ExclusiveVar3 = "value3" + with self.assertRaises(InvalidResourceException) as error_3: + resource.validate_before_transform(self.TestProperties) + self.assertEqual( + "Resource with id [TestId] is invalid. Cannot specify 'ExclusiveVar2' and 'ExclusiveVar3' together.", + error_3.exception.message, + ) + + # Test 4: Test multiple conflicts + resource.ExclusiveVar1 = "value1" + resource.ExclusiveVar2 = "value2" + resource.ExclusiveVar3 = "value3" + with self.assertRaises(InvalidResourceException) as error_4: + resource.validate_before_transform(self.TestProperties) + expected_errors = [ + "Cannot specify 'ExclusiveVar1' and 'ExclusiveVar2' together", + "Cannot specify 'ExclusiveVar2' and 'ExclusiveVar3' together", + ] + for expected_error in expected_errors: + self.assertIn(expected_error, error_4.exception.message) + + # Test 5: Test nested property exclusions + resource.ExclusiveVar1 = None + resource.ExclusiveVar2 = None + resource.ExclusiveVar3 = None + resource.NestedSetting1 = {"NestedVar1": "nested_value1"} + resource.NestedSetting2 = {"NestedVar1": "nested_value2"} + with self.assertRaises(InvalidResourceException) as error_5: + resource.validate_before_transform(self.TestProperties) + self.assertEqual( + "Resource with id [TestId] is invalid. Cannot specify 'NestedSetting1.NestedVar1' and 'NestedSetting2.NestedVar1' together.", + error_5.exception.message, + ) + + # Test 6: Test nested vs top-level exclusion + resource.NestedSetting2 = {"NestedVar1": None} + resource.NestedSetting1 = {"NestedVar2": "nested_value1"} + resource.ExclusiveVar3 = "value3" + with self.assertRaises(InvalidResourceException) as error_6: + resource.validate_before_transform(self.TestProperties) + self.assertEqual( + "Resource with id [TestId] is invalid. Cannot specify 'NestedSetting1.NestedVar2' and 'ExclusiveVar3' together.", + error_6.exception.message, + ) - for expected_message in expected_messages: - self.assertIn(expected_message, formatted_errors) + # Test 7: Clean state should pass + resource.ExclusiveVar1 = None + resource.ExclusiveVar2 = None + resource.ExclusiveVar3 = None + resource.NestedSetting1 = {"NestedVar1": "nested_value1"} + resource.NestedSetting2 = {"NestedVar2": "nested_value2"} + resource.validate_before_transform(self.TestProperties) + + @parameterized.expand( + [ + # Single type error + ( + [{"loc": ("ConditionalVar1",), "msg": "not a valid int", "type": "type_error.integer"}], + 1, + ["Property 'ConditionalVar1' value must be integer."], + ), + # Union type consolidation + ( + [ + {"loc": ("ExclusiveVar1",), "msg": "not a valid str", "type": "type_error.str"}, + {"loc": ("ExclusiveVar1",), "msg": "not a valid int", "type": "type_error.integer"}, + ], + 1, + ["Property 'ExclusiveVar1' value must be string or integer."], + ), + # Missing property error + ( + [{"loc": ("RequiredProperty",), "msg": "field required", "type": "value_error.missing"}], + 1, + ["Property 'RequiredProperty' is required."], + ), + # Invalid property error + ( + [{"loc": ("InvalidProperty",), "msg": "extra fields not permitted", "type": "value_error.extra"}], + 1, + ["Property 'InvalidProperty' is an invalid property."], + ), + # Nested property error + ( + [{"loc": ("NestedSetting1", "NestedVar1"), "msg": "not a valid str", "type": "type_error.str"}], + 1, + ["Property 'NestedSetting1.NestedVar1' value must be string."], + ), + # Multiple properties with errors + ( + [ + {"loc": ("ConditionalVar1",), "msg": "not a valid int", "type": "type_error.integer"}, + {"loc": ("ExclusiveVar1",), "msg": "field required", "type": "value_error.missing"}, + {"loc": ("NestedSetting1", "NestedVar1"), "msg": "not a valid str", "type": "type_error.str"}, + ], + 3, + [ + "Property 'ConditionalVar1' value must be integer.", + "Property 'ExclusiveVar1' is required.", + "Property 'NestedSetting1.NestedVar1' value must be string.", + ], + ), + # Fallback error formatting + ( + [{"loc": ("SomeProperty",), "msg": "Some Custom Error Message", "type": "custom_error"}], + 1, + ["Property 'SomeProperty' some custom error message."], + ), + ], + ) + def test_format_all_errors(self, mock_errors, expected_count, expected_messages): + """Test formatting various types of validation errors""" + resource = self.TestResource("TestId") + + formatted_errors = resource._format_all_errors(mock_errors) + self.assertEqual(len(formatted_errors), expected_count) + + for expected_message in expected_messages: + self.assertIn(expected_message, formatted_errors) diff --git a/tests/validator/__init__.py b/tests/validator/__init__.py new file mode 100644 index 000000000..703014275 --- /dev/null +++ b/tests/validator/__init__.py @@ -0,0 +1 @@ +# Validator tests package diff --git a/tests/validator/test_property_rule.py b/tests/validator/test_property_rule.py new file mode 100644 index 000000000..78a33fee9 --- /dev/null +++ b/tests/validator/test_property_rule.py @@ -0,0 +1,170 @@ +from typing import Optional +from unittest import TestCase + +from samtranslator.internal.schema_source.common import BaseModel +from samtranslator.validator.property_rule import PropertyRules, RuleType + + +class PropertyRulesTestSchema(BaseModel): + """Shared test schema for all property rule tests""" + + ImageUri: Optional[str] = None + CodeUri: Optional[str] = None + InlineCode: Optional[str] = None + DeploymentPreference: Optional[str] = None + AutoPublishAlias: Optional[str] = None + Runtime: Optional[str] = None + PackageType: Optional[str] = None + ImageConfig: Optional[dict] = None + Handler: Optional[str] = None + Layers: Optional[list] = None + VpcConfig: Optional[dict] = None + SecurityGroupIds: Optional[list] = None + SubnetIds: Optional[list] = None + + +class TestPropertyRulesCreation(TestCase): + """Test PropertyRules creation and rule building API""" + + def test_create_property_rule(self): + """Test PropertyRules creation""" + rule = PropertyRules() + self.assertIsInstance(rule, PropertyRules) + self.assertEqual(len(rule._rules), 0) + + def test_add_mutually_exclusive_rule(self): + """Test adding mutually exclusive validation rule""" + rule = PropertyRules().addMutuallyExclusive("ImageUri", "CodeUri", "InlineCode") + self.assertEqual(len(rule._rules), 1) + self.assertEqual(rule._rules[0][0], RuleType.MUTUALLY_EXCLUSIVE) + + def test_add_conditional_exclusive_rule(self): + """Test adding conditional exclusive validation rule""" + rule = PropertyRules().addConditionalExclusive("Runtime", ["ImageUri"]) + self.assertEqual(len(rule._rules), 1) + self.assertEqual(rule._rules[0][0], RuleType.CONDITIONAL_EXCLUSIVE) + + def test_add_mutually_inclusive_rule(self): + """Test adding mutually inclusive validation rule""" + rule = PropertyRules().addMutuallyInclusive("VpcConfig", "SecurityGroupIds", "SubnetIds") + self.assertEqual(len(rule._rules), 1) + self.assertEqual(rule._rules[0][0], RuleType.MUTUALLY_INCLUSIVE) + + def test_add_conditional_inclusive_rule(self): + """Test adding conditional inclusive validation rule""" + rule = PropertyRules().addConditionalInclusive("VpcConfig", ["SecurityGroupIds", "SubnetIds"]) + self.assertEqual(len(rule._rules), 1) + self.assertEqual(rule._rules[0][0], RuleType.CONDITIONAL_INCLUSIVE) + + def test_fluent_chaining(self): + """Test fluent API chaining with all rule types""" + rule = ( + PropertyRules() + .addMutuallyExclusive("ImageUri", "CodeUri", "InlineCode") + .addConditionalExclusive("Runtime", ["ImageUri"]) + .addMutuallyInclusive("VpcConfig", "SecurityGroupIds", "SubnetIds") + .addConditionalInclusive("VpcConfig", ["SecurityGroupIds", "SubnetIds"]) + ) + self.assertEqual(len(rule._rules), 4) + + +class TestPropertyRulesValidation(TestCase): + """Test PropertyRules validation logic with different rule types""" + + def test_validate_property_names_valid(self): + """Test validation with valid property names""" + PropertyRules().addMutuallyExclusive("ImageUri", "CodeUri") + + def test_validate_property_names_invalid(self): + """Test validation with invalid property names""" + rules = PropertyRules().addMutuallyExclusive("InvalidProperty", "CodeUri") + validated_model = PropertyRulesTestSchema() + errors = rules.validate_all(validated_model) + self.assertTrue(any("InvalidProperty" in error for error in errors)) + + +class TestMutuallyExclusiveRules(TestCase): + """Test mutually exclusive validation rules""" + + def test_mutually_exclusive_success(self): + """Test mutually exclusive rule validation - success case""" + rules = PropertyRules().addMutuallyExclusive("ImageUri", "CodeUri", "InlineCode") + validated_model = PropertyRulesTestSchema(ImageUri="some-image", CodeUri=None, InlineCode=None) + errors = rules.validate_all(validated_model) + self.assertEqual(errors, []) + + def test_mutually_exclusive_failure(self): + """Test mutually exclusive rule validation - failure case""" + rules = PropertyRules().addMutuallyExclusive("ImageUri", "CodeUri", "InlineCode") + validated_model = PropertyRulesTestSchema(ImageUri="some-image", CodeUri="some-code", InlineCode=None) + errors = rules.validate_all(validated_model) + self.assertEqual(len(errors), 1) + self.assertIn("Cannot specify", errors[0]) + + +class TestConditionalExclusiveRules(TestCase): + """Test conditional exclusive validation rules""" + + def test_conditional_exclusive_success(self): + """Test conditional exclusive rule validation - success case""" + rules = PropertyRules().addConditionalExclusive("Runtime", ["ImageUri"]) + validated_model = PropertyRulesTestSchema(Runtime="python3.9", ImageUri=None) + errors = rules.validate_all(validated_model) + self.assertEqual(errors, []) + + def test_conditional_exclusive_failure(self): + """Test conditional exclusive rule validation - failure case""" + rules = PropertyRules().addConditionalExclusive("Runtime", ["ImageUri"]) + validated_model = PropertyRulesTestSchema(Runtime="python3.9", ImageUri="some-image") + errors = rules.validate_all(validated_model) + self.assertEqual(len(errors), 1) + self.assertIn("'Runtime' cannot be used with 'ImageUri'", errors[0]) + + +class TestMutuallyInclusiveRules(TestCase): + """Test mutually inclusive validation rules""" + + def test_mutually_inclusive_success_all_present(self): + """Test mutually inclusive rule validation - all present""" + rules = PropertyRules().addMutuallyInclusive("VpcConfig", "SecurityGroupIds", "SubnetIds") + validated_model = PropertyRulesTestSchema( + VpcConfig={"some": "config"}, SecurityGroupIds=["sg-123"], SubnetIds=["subnet-123"] + ) + errors = rules.validate_all(validated_model) + self.assertEqual(errors, []) + + def test_mutually_inclusive_success_none_present(self): + """Test mutually inclusive rule validation - none present""" + rules = PropertyRules().addMutuallyInclusive("VpcConfig", "SecurityGroupIds", "SubnetIds") + validated_model = PropertyRulesTestSchema(VpcConfig=None, SecurityGroupIds=None, SubnetIds=None) + errors = rules.validate_all(validated_model) + self.assertEqual(errors, []) + + def test_mutually_inclusive_failure(self): + """Test mutually inclusive rule validation - partial present""" + rules = PropertyRules().addMutuallyInclusive("VpcConfig", "SecurityGroupIds", "SubnetIds") + validated_model = PropertyRulesTestSchema(VpcConfig={"some": "config"}, SecurityGroupIds=None, SubnetIds=None) + errors = rules.validate_all(validated_model) + self.assertEqual(len(errors), 1) + self.assertIn("When using 'VpcConfig', you must also specify", errors[0]) + + +class TestConditionalInclusiveRules(TestCase): + """Test conditional inclusive validation rules""" + + def test_conditional_inclusive_success(self): + """Test conditional inclusive rule validation - success case""" + rules = PropertyRules().addConditionalInclusive("VpcConfig", ["SecurityGroupIds", "SubnetIds"]) + validated_model = PropertyRulesTestSchema( + VpcConfig={"some": "config"}, SecurityGroupIds=["sg-123"], SubnetIds=["subnet-123"] + ) + errors = rules.validate_all(validated_model) + self.assertEqual(errors, []) + + def test_conditional_inclusive_failure(self): + """Test conditional inclusive rule validation - failure case""" + rules = PropertyRules().addConditionalInclusive("VpcConfig", ["SecurityGroupIds", "SubnetIds"]) + validated_model = PropertyRulesTestSchema(VpcConfig={"some": "config"}, SecurityGroupIds=None, SubnetIds=None) + errors = rules.validate_all(validated_model) + self.assertEqual(len(errors), 1) + self.assertIn("'VpcConfig' requires all of:", errors[0]) From 9d2125c77bf98c64bcac77c7567c148476a11a39 Mon Sep 17 00:00:00 2001 From: Vichym Date: Wed, 27 Aug 2025 01:25:08 -0700 Subject: [PATCH 5/5] address comments --- tests/test_model.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_model.py b/tests/test_model.py index a506e42cf..6d8dcb32d 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -630,13 +630,13 @@ def get_property_validation_rules(self): resource.ConditionalVar3 = None resource.validate_before_transform(self.TestProperties) - # Test 3: When NestedSetting1.NestedVar1="test", NestedSetting2.NestedVar2 should NOT be present + # Test 3: When NestedSetting1.NestedVar1="test", NestedSetting2.NestedVar2=22 should NOT be present resource.NestedSetting1 = {"NestedVar1": "test"} - resource.NestedSetting2 = {"NestedVar2": "value2"} + resource.NestedSetting2 = {"NestedVar2": 22} with self.assertRaises(InvalidResourceException) as error_3: resource.validate_before_transform(self.TestProperties) self.assertIn( - "'NestedSetting1.NestedVar1=test' cannot be used with 'NestedSetting2.NestedVar2'", + "'NestedSetting1.NestedVar1=test' cannot be used with 'NestedSetting2.NestedVar2=22'", error_3.exception.message, )