From 81732d477b4cbb26530dd610a928e1faccb39483 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Tue, 2 Sep 2025 00:05:46 -0700 Subject: [PATCH] Add validation for compatible schema evolution when partition fields or sort order fields reference schema fields --- pyiceberg/partitioning.py | 34 +++++++++++++++++- pyiceberg/table/sorting.py | 17 ++++++++- pyiceberg/table/update/__init__.py | 6 ++++ tests/conftest.py | 6 ++++ tests/integration/test_catalog.py | 58 ++++++++++++++++++++++++++++++ tests/table/test_partitioning.py | 34 ++++++++++++++++++ tests/table/test_sorting.py | 38 +++++++++++++++++++- 7 files changed, 190 insertions(+), 3 deletions(-) diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 408126d3b3..a19c13ffa2 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -32,7 +32,8 @@ model_validator, ) -from pyiceberg.schema import Schema +from pyiceberg.exceptions import ValidationError +from pyiceberg.schema import Schema, _index_parents, index_by_id from pyiceberg.transforms import ( BucketTransform, DayTransform, @@ -245,6 +246,37 @@ def partition_to_path(self, data: Record, schema: Schema) -> str: path = "/".join([field_str + "=" + value_str for field_str, value_str in zip(field_strs, value_strs)]) return path + @staticmethod + def check_compatibility(partition_spec: PartitionSpec, schema: Schema, allow_missing_fields: bool = False) -> None: + # if the underlying field is dropped, we cannot check they are compatible -- continue + schema_fields = index_by_id(schema) + parents = _index_parents(schema) + + def validate_parents_are_structs(field_id: int) -> None: + parent_id = parents.get(field_id) + while parent_id: + parent_type = schema.find_type(parent_id) + if not parent_type.is_struct: + raise ValidationError("Invalid partition field parent: %s", parent_type) + parent_id = parents.get(parent_id) + + for field in partition_spec.fields: + source_field = schema_fields.get(field.source_id) + if allow_missing_fields and source_field: + continue + + if not isinstance(field.transform, VoidTransform): + if source_field: + source_type = source_field.field_type + if not source_type.is_primitive: + raise ValidationError(f"Cannot partition by non-primitive source field: {source_type}") + if not field.transform.can_transform(source_type): + raise ValidationError(f"Invalid source type {source_type} for transform: {field.transform}") + # The only valid parent types for a PartitionField are StructTypes. This must be checked recursively + validate_parents_are_structs(field.source_id) + else: + raise ValidationError(f"Cannot find source column for partition field: {field}") + UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0) diff --git a/pyiceberg/table/sorting.py b/pyiceberg/table/sorting.py index 244c8ba867..5b39b02e9d 100644 --- a/pyiceberg/table/sorting.py +++ b/pyiceberg/table/sorting.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=keyword-arg-before-vararg +from __future__ import annotations + from enum import Enum from typing import Annotated, Any, Callable, Dict, List, Optional, Union @@ -26,7 +28,8 @@ model_validator, ) -from pyiceberg.schema import Schema +from pyiceberg.exceptions import ValidationError +from pyiceberg.schema import Schema, index_by_id from pyiceberg.transforms import IdentityTransform, Transform, parse_transform from pyiceberg.typedef import IcebergBaseModel from pyiceberg.types import IcebergType @@ -168,6 +171,18 @@ def __repr__(self) -> str: fields = f"{', '.join(repr(column) for column in self.fields)}, " if self.fields else "" return f"SortOrder({fields}order_id={self.order_id})" + @staticmethod + def check_compatibility(sort_order: SortOrder, schema: Schema) -> None: + schema_ids = index_by_id(schema) + for field in sort_order.fields: + if source_field := schema_ids.get(field.source_id): + if not source_field.field_type.is_primitive: + raise ValidationError(f"Cannot sort by non-primitive source field: {source_field}") + if not field.transform.can_transform(source_field.field_type): + raise ValidationError(f"Invalid source type {source_field.field_type} for transform: {field.transform}") + else: + raise ValidationError(f"Cannot find source column for sort field: {field}") + UNSORTED_SORT_ORDER_ID = 0 UNSORTED_SORT_ORDER = SortOrder(order_id=UNSORTED_SORT_ORDER_ID) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index c30d960d38..341b6125f3 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -642,6 +642,12 @@ def update_table_metadata( if base_metadata.last_updated_ms == new_metadata.last_updated_ms: new_metadata = new_metadata.model_copy(update={"last_updated_ms": datetime_to_millis(datetime.now().astimezone())}) + # Check correctness of partition spec, and sort order + PartitionSpec.check_compatibility(new_metadata.spec(), new_metadata.schema()) + + if sort_order := new_metadata.sort_order_by_id(new_metadata.default_sort_order_id): + SortOrder.check_compatibility(sort_order, new_metadata.schema()) + if enforce_validation: return TableMetadataUtil.parse_obj(new_metadata.model_dump()) else: diff --git a/tests/conftest.py b/tests/conftest.py index 5aff45c1ed..7f29d810ce 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -73,6 +73,7 @@ from pyiceberg.serializers import ToOutputFile from pyiceberg.table import FileScanTask, Table from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2 +from pyiceberg.table.sorting import NullOrder, SortField, SortOrder from pyiceberg.transforms import DayTransform, IdentityTransform from pyiceberg.types import ( BinaryType, @@ -1875,6 +1876,11 @@ def test_partition_spec() -> Schema: ) +@pytest.fixture(scope="session") +def test_sort_order() -> SortOrder: + return SortOrder(SortField(source_id=1, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST)) + + @pytest.fixture(scope="session") def generated_manifest_entry_file( avro_schema_manifest_entry: Dict[str, Any], test_schema: Schema, test_partition_spec: PartitionSpec diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index be93382daa..97af35e87a 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -31,9 +31,14 @@ NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError, + ValidationError, ) from pyiceberg.io import WAREHOUSE +from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.table.sorting import SortOrder +from pyiceberg.transforms import DayTransform +from pyiceberg.types import IntegerType, NestedField, TimestampType from tests.conftest import clean_up @@ -247,6 +252,59 @@ def test_table_exists(test_catalog: Catalog, table_schema_nested: Schema, databa assert test_catalog.table_exists((database_name, table_name)) is True +@pytest.mark.integration +@pytest.mark.parametrize("test_catalog", CATALOGS) +def test_incompatible_partitioned_schema_evolution( + test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, database_name: str, table_name: str +) -> None: + if isinstance(test_catalog, HiveCatalog): + pytest.skip("HiveCatalog does not support schema evolution") + + identifier = (database_name, table_name) + test_catalog.create_namespace(database_name) + table = test_catalog.create_table(identifier, test_schema, partition_spec=test_partition_spec) + assert test_catalog.table_exists(identifier) + + with pytest.raises(ValidationError): + with table.update_schema() as update: + update.delete_column("VendorID") + + # Assert column was not dropped + assert "VendorID" in table.schema().column_names + + with table.transaction() as transaction: + with transaction.update_spec() as spec_update: + spec_update.remove_field("VendorID") + + with transaction.update_schema() as schema_update: + schema_update.delete_column("VendorID") + + assert table.spec() == PartitionSpec(PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"), spec_id=1) + assert table.schema() == Schema(NestedField(2, "tpep_pickup_datetime", TimestampType(), False)) + + +@pytest.mark.integration +@pytest.mark.parametrize("test_catalog", CATALOGS) +def test_incompatible_sorted_schema_evolution( + test_catalog: Catalog, test_schema: Schema, test_sort_order: SortOrder, database_name: str, table_name: str +) -> None: + if isinstance(test_catalog, HiveCatalog): + pytest.skip("HiveCatalog does not support schema evolution") + + identifier = (database_name, table_name) + test_catalog.create_namespace(database_name) + table = test_catalog.create_table(identifier, test_schema, sort_order=test_sort_order) + assert test_catalog.table_exists(identifier) + + with pytest.raises(ValidationError): + with table.update_schema() as update: + update.delete_column("VendorID") + + assert table.schema() == Schema( + NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", TimestampType(), False) + ) + + @pytest.mark.integration @pytest.mark.parametrize("test_catalog", CATALOGS) def test_create_namespace(test_catalog: Catalog, database_name: str) -> None: diff --git a/tests/table/test_partitioning.py b/tests/table/test_partitioning.py index 0fe22391c0..c9336fdfb3 100644 --- a/tests/table/test_partitioning.py +++ b/tests/table/test_partitioning.py @@ -21,6 +21,7 @@ import pytest +from pyiceberg.exceptions import ValidationError from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.transforms import ( @@ -223,3 +224,36 @@ def test_deserialize_partition_field_v3() -> None: field = PartitionField.model_validate_json(json_partition_spec) assert field == PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate") + + +def test_incompatible_source_column_not_found() -> None: + schema = Schema(NestedField(1, "foo", IntegerType()), NestedField(2, "bar", IntegerType())) + + spec = PartitionSpec(PartitionField(3, 1000, IdentityTransform(), "some_partition")) + + with pytest.raises(ValidationError) as exc: + PartitionSpec.check_compatibility(spec, schema) + + assert "Cannot find source column for partition field: 1000: some_partition: identity(3)" in str(exc.value) + + +def test_incompatible_non_primitive_type() -> None: + schema = Schema(NestedField(1, "foo", StructType()), NestedField(2, "bar", IntegerType())) + + spec = PartitionSpec(PartitionField(1, 1000, IdentityTransform(), "some_partition")) + + with pytest.raises(ValidationError) as exc: + PartitionSpec.check_compatibility(spec, schema) + + assert "Cannot partition by non-primitive source field: struct<>" in str(exc.value) + + +def test_incompatible_transform_source_type() -> None: + schema = Schema(NestedField(1, "foo", IntegerType()), NestedField(2, "bar", IntegerType())) + + spec = PartitionSpec(PartitionField(1, 1000, YearTransform(), "some_partition")) + + with pytest.raises(ValidationError) as exc: + PartitionSpec.check_compatibility(spec, schema) + + assert "Invalid source type int for transform: year" in str(exc.value) diff --git a/tests/table/test_sorting.py b/tests/table/test_sorting.py index 3efda56509..3082bc8b2a 100644 --- a/tests/table/test_sorting.py +++ b/tests/table/test_sorting.py @@ -20,6 +20,8 @@ import pytest +from pyiceberg.exceptions import ValidationError +from pyiceberg.schema import Schema from pyiceberg.table.metadata import TableMetadataUtil from pyiceberg.table.sorting import ( UNSORTED_SORT_ORDER, @@ -28,7 +30,8 @@ SortField, SortOrder, ) -from pyiceberg.transforms import BucketTransform, IdentityTransform, VoidTransform +from pyiceberg.transforms import BucketTransform, IdentityTransform, VoidTransform, YearTransform +from pyiceberg.types import IntegerType, NestedField, StructType @pytest.fixture @@ -114,3 +117,36 @@ def test_serialize_sort_field_v3() -> None: expected = SortField(source_id=19, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST) payload = '{"source-ids":[19],"transform":"identity","direction":"asc","null-order":"nulls-first"}' assert SortField.model_validate_json(payload) == expected + + +def test_incompatible_source_column_not_found(sort_order: SortOrder) -> None: + schema = Schema(NestedField(1, "foo", IntegerType()), NestedField(2, "bar", IntegerType())) + + with pytest.raises(ValidationError) as exc: + SortOrder.check_compatibility(sort_order, schema) + + assert "Cannot find source column for sort field: 19 ASC NULLS FIRST" in str(exc.value) + + +def test_incompatible_non_primitive_type() -> None: + schema = Schema(NestedField(1, "foo", StructType()), NestedField(2, "bar", IntegerType())) + + sort_order = SortOrder(SortField(source_id=1, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST)) + + with pytest.raises(ValidationError) as exc: + SortOrder.check_compatibility(sort_order, schema) + + assert "Cannot sort by non-primitive source field: 1: foo: optional struct<>" in str(exc.value) + + +def test_incompatible_transform_source_type() -> None: + schema = Schema(NestedField(1, "foo", IntegerType()), NestedField(2, "bar", IntegerType())) + + sort_order = SortOrder( + SortField(source_id=1, transform=YearTransform(), null_order=NullOrder.NULLS_FIRST), + ) + + with pytest.raises(ValidationError) as exc: + SortOrder.check_compatibility(sort_order, schema) + + assert "Invalid source type int for transform: year" in str(exc.value)