Skip to content

Commit 81732d4

Browse files
committed
Add validation for compatible schema evolution when partition fields or sort order fields reference schema fields
1 parent 19ba343 commit 81732d4

File tree

7 files changed

+190
-3
lines changed

7 files changed

+190
-3
lines changed

pyiceberg/partitioning.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
model_validator,
3333
)
3434

35-
from pyiceberg.schema import Schema
35+
from pyiceberg.exceptions import ValidationError
36+
from pyiceberg.schema import Schema, _index_parents, index_by_id
3637
from pyiceberg.transforms import (
3738
BucketTransform,
3839
DayTransform,
@@ -245,6 +246,37 @@ def partition_to_path(self, data: Record, schema: Schema) -> str:
245246
path = "/".join([field_str + "=" + value_str for field_str, value_str in zip(field_strs, value_strs)])
246247
return path
247248

249+
@staticmethod
250+
def check_compatibility(partition_spec: PartitionSpec, schema: Schema, allow_missing_fields: bool = False) -> None:
251+
# if the underlying field is dropped, we cannot check they are compatible -- continue
252+
schema_fields = index_by_id(schema)
253+
parents = _index_parents(schema)
254+
255+
def validate_parents_are_structs(field_id: int) -> None:
256+
parent_id = parents.get(field_id)
257+
while parent_id:
258+
parent_type = schema.find_type(parent_id)
259+
if not parent_type.is_struct:
260+
raise ValidationError("Invalid partition field parent: %s", parent_type)
261+
parent_id = parents.get(parent_id)
262+
263+
for field in partition_spec.fields:
264+
source_field = schema_fields.get(field.source_id)
265+
if allow_missing_fields and source_field:
266+
continue
267+
268+
if not isinstance(field.transform, VoidTransform):
269+
if source_field:
270+
source_type = source_field.field_type
271+
if not source_type.is_primitive:
272+
raise ValidationError(f"Cannot partition by non-primitive source field: {source_type}")
273+
if not field.transform.can_transform(source_type):
274+
raise ValidationError(f"Invalid source type {source_type} for transform: {field.transform}")
275+
# The only valid parent types for a PartitionField are StructTypes. This must be checked recursively
276+
validate_parents_are_structs(field.source_id)
277+
else:
278+
raise ValidationError(f"Cannot find source column for partition field: {field}")
279+
248280

249281
UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
250282

pyiceberg/table/sorting.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
# pylint: disable=keyword-arg-before-vararg
18+
from __future__ import annotations
19+
1820
from enum import Enum
1921
from typing import Annotated, Any, Callable, Dict, List, Optional, Union
2022

@@ -26,7 +28,8 @@
2628
model_validator,
2729
)
2830

29-
from pyiceberg.schema import Schema
31+
from pyiceberg.exceptions import ValidationError
32+
from pyiceberg.schema import Schema, index_by_id
3033
from pyiceberg.transforms import IdentityTransform, Transform, parse_transform
3134
from pyiceberg.typedef import IcebergBaseModel
3235
from pyiceberg.types import IcebergType
@@ -168,6 +171,18 @@ def __repr__(self) -> str:
168171
fields = f"{', '.join(repr(column) for column in self.fields)}, " if self.fields else ""
169172
return f"SortOrder({fields}order_id={self.order_id})"
170173

174+
@staticmethod
175+
def check_compatibility(sort_order: SortOrder, schema: Schema) -> None:
176+
schema_ids = index_by_id(schema)
177+
for field in sort_order.fields:
178+
if source_field := schema_ids.get(field.source_id):
179+
if not source_field.field_type.is_primitive:
180+
raise ValidationError(f"Cannot sort by non-primitive source field: {source_field}")
181+
if not field.transform.can_transform(source_field.field_type):
182+
raise ValidationError(f"Invalid source type {source_field.field_type} for transform: {field.transform}")
183+
else:
184+
raise ValidationError(f"Cannot find source column for sort field: {field}")
185+
171186

172187
UNSORTED_SORT_ORDER_ID = 0
173188
UNSORTED_SORT_ORDER = SortOrder(order_id=UNSORTED_SORT_ORDER_ID)

pyiceberg/table/update/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,12 @@ def update_table_metadata(
642642
if base_metadata.last_updated_ms == new_metadata.last_updated_ms:
643643
new_metadata = new_metadata.model_copy(update={"last_updated_ms": datetime_to_millis(datetime.now().astimezone())})
644644

645+
# Check correctness of partition spec, and sort order
646+
PartitionSpec.check_compatibility(new_metadata.spec(), new_metadata.schema())
647+
648+
if sort_order := new_metadata.sort_order_by_id(new_metadata.default_sort_order_id):
649+
SortOrder.check_compatibility(sort_order, new_metadata.schema())
650+
645651
if enforce_validation:
646652
return TableMetadataUtil.parse_obj(new_metadata.model_dump())
647653
else:

tests/conftest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
from pyiceberg.serializers import ToOutputFile
7474
from pyiceberg.table import FileScanTask, Table
7575
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2
76+
from pyiceberg.table.sorting import NullOrder, SortField, SortOrder
7677
from pyiceberg.transforms import DayTransform, IdentityTransform
7778
from pyiceberg.types import (
7879
BinaryType,
@@ -1875,6 +1876,11 @@ def test_partition_spec() -> Schema:
18751876
)
18761877

18771878

1879+
@pytest.fixture(scope="session")
1880+
def test_sort_order() -> SortOrder:
1881+
return SortOrder(SortField(source_id=1, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST))
1882+
1883+
18781884
@pytest.fixture(scope="session")
18791885
def generated_manifest_entry_file(
18801886
avro_schema_manifest_entry: Dict[str, Any], test_schema: Schema, test_partition_spec: PartitionSpec

tests/integration/test_catalog.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,14 @@
3131
NoSuchNamespaceError,
3232
NoSuchTableError,
3333
TableAlreadyExistsError,
34+
ValidationError,
3435
)
3536
from pyiceberg.io import WAREHOUSE
37+
from pyiceberg.partitioning import PartitionField, PartitionSpec
3638
from pyiceberg.schema import Schema
39+
from pyiceberg.table.sorting import SortOrder
40+
from pyiceberg.transforms import DayTransform
41+
from pyiceberg.types import IntegerType, NestedField, TimestampType
3742
from tests.conftest import clean_up
3843

3944

@@ -247,6 +252,59 @@ def test_table_exists(test_catalog: Catalog, table_schema_nested: Schema, databa
247252
assert test_catalog.table_exists((database_name, table_name)) is True
248253

249254

255+
@pytest.mark.integration
256+
@pytest.mark.parametrize("test_catalog", CATALOGS)
257+
def test_incompatible_partitioned_schema_evolution(
258+
test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, database_name: str, table_name: str
259+
) -> None:
260+
if isinstance(test_catalog, HiveCatalog):
261+
pytest.skip("HiveCatalog does not support schema evolution")
262+
263+
identifier = (database_name, table_name)
264+
test_catalog.create_namespace(database_name)
265+
table = test_catalog.create_table(identifier, test_schema, partition_spec=test_partition_spec)
266+
assert test_catalog.table_exists(identifier)
267+
268+
with pytest.raises(ValidationError):
269+
with table.update_schema() as update:
270+
update.delete_column("VendorID")
271+
272+
# Assert column was not dropped
273+
assert "VendorID" in table.schema().column_names
274+
275+
with table.transaction() as transaction:
276+
with transaction.update_spec() as spec_update:
277+
spec_update.remove_field("VendorID")
278+
279+
with transaction.update_schema() as schema_update:
280+
schema_update.delete_column("VendorID")
281+
282+
assert table.spec() == PartitionSpec(PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"), spec_id=1)
283+
assert table.schema() == Schema(NestedField(2, "tpep_pickup_datetime", TimestampType(), False))
284+
285+
286+
@pytest.mark.integration
287+
@pytest.mark.parametrize("test_catalog", CATALOGS)
288+
def test_incompatible_sorted_schema_evolution(
289+
test_catalog: Catalog, test_schema: Schema, test_sort_order: SortOrder, database_name: str, table_name: str
290+
) -> None:
291+
if isinstance(test_catalog, HiveCatalog):
292+
pytest.skip("HiveCatalog does not support schema evolution")
293+
294+
identifier = (database_name, table_name)
295+
test_catalog.create_namespace(database_name)
296+
table = test_catalog.create_table(identifier, test_schema, sort_order=test_sort_order)
297+
assert test_catalog.table_exists(identifier)
298+
299+
with pytest.raises(ValidationError):
300+
with table.update_schema() as update:
301+
update.delete_column("VendorID")
302+
303+
assert table.schema() == Schema(
304+
NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", TimestampType(), False)
305+
)
306+
307+
250308
@pytest.mark.integration
251309
@pytest.mark.parametrize("test_catalog", CATALOGS)
252310
def test_create_namespace(test_catalog: Catalog, database_name: str) -> None:

tests/table/test_partitioning.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import pytest
2323

24+
from pyiceberg.exceptions import ValidationError
2425
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
2526
from pyiceberg.schema import Schema
2627
from pyiceberg.transforms import (
@@ -223,3 +224,36 @@ def test_deserialize_partition_field_v3() -> None:
223224

224225
field = PartitionField.model_validate_json(json_partition_spec)
225226
assert field == PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate")
227+
228+
229+
def test_incompatible_source_column_not_found() -> None:
230+
schema = Schema(NestedField(1, "foo", IntegerType()), NestedField(2, "bar", IntegerType()))
231+
232+
spec = PartitionSpec(PartitionField(3, 1000, IdentityTransform(), "some_partition"))
233+
234+
with pytest.raises(ValidationError) as exc:
235+
PartitionSpec.check_compatibility(spec, schema)
236+
237+
assert "Cannot find source column for partition field: 1000: some_partition: identity(3)" in str(exc.value)
238+
239+
240+
def test_incompatible_non_primitive_type() -> None:
241+
schema = Schema(NestedField(1, "foo", StructType()), NestedField(2, "bar", IntegerType()))
242+
243+
spec = PartitionSpec(PartitionField(1, 1000, IdentityTransform(), "some_partition"))
244+
245+
with pytest.raises(ValidationError) as exc:
246+
PartitionSpec.check_compatibility(spec, schema)
247+
248+
assert "Cannot partition by non-primitive source field: struct<>" in str(exc.value)
249+
250+
251+
def test_incompatible_transform_source_type() -> None:
252+
schema = Schema(NestedField(1, "foo", IntegerType()), NestedField(2, "bar", IntegerType()))
253+
254+
spec = PartitionSpec(PartitionField(1, 1000, YearTransform(), "some_partition"))
255+
256+
with pytest.raises(ValidationError) as exc:
257+
PartitionSpec.check_compatibility(spec, schema)
258+
259+
assert "Invalid source type int for transform: year" in str(exc.value)

tests/table/test_sorting.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import pytest
2222

23+
from pyiceberg.exceptions import ValidationError
24+
from pyiceberg.schema import Schema
2325
from pyiceberg.table.metadata import TableMetadataUtil
2426
from pyiceberg.table.sorting import (
2527
UNSORTED_SORT_ORDER,
@@ -28,7 +30,8 @@
2830
SortField,
2931
SortOrder,
3032
)
31-
from pyiceberg.transforms import BucketTransform, IdentityTransform, VoidTransform
33+
from pyiceberg.transforms import BucketTransform, IdentityTransform, VoidTransform, YearTransform
34+
from pyiceberg.types import IntegerType, NestedField, StructType
3235

3336

3437
@pytest.fixture
@@ -114,3 +117,36 @@ def test_serialize_sort_field_v3() -> None:
114117
expected = SortField(source_id=19, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST)
115118
payload = '{"source-ids":[19],"transform":"identity","direction":"asc","null-order":"nulls-first"}'
116119
assert SortField.model_validate_json(payload) == expected
120+
121+
122+
def test_incompatible_source_column_not_found(sort_order: SortOrder) -> None:
123+
schema = Schema(NestedField(1, "foo", IntegerType()), NestedField(2, "bar", IntegerType()))
124+
125+
with pytest.raises(ValidationError) as exc:
126+
SortOrder.check_compatibility(sort_order, schema)
127+
128+
assert "Cannot find source column for sort field: 19 ASC NULLS FIRST" in str(exc.value)
129+
130+
131+
def test_incompatible_non_primitive_type() -> None:
132+
schema = Schema(NestedField(1, "foo", StructType()), NestedField(2, "bar", IntegerType()))
133+
134+
sort_order = SortOrder(SortField(source_id=1, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST))
135+
136+
with pytest.raises(ValidationError) as exc:
137+
SortOrder.check_compatibility(sort_order, schema)
138+
139+
assert "Cannot sort by non-primitive source field: 1: foo: optional struct<>" in str(exc.value)
140+
141+
142+
def test_incompatible_transform_source_type() -> None:
143+
schema = Schema(NestedField(1, "foo", IntegerType()), NestedField(2, "bar", IntegerType()))
144+
145+
sort_order = SortOrder(
146+
SortField(source_id=1, transform=YearTransform(), null_order=NullOrder.NULLS_FIRST),
147+
)
148+
149+
with pytest.raises(ValidationError) as exc:
150+
SortOrder.check_compatibility(sort_order, schema)
151+
152+
assert "Invalid source type int for transform: year" in str(exc.value)

0 commit comments

Comments
 (0)