From 677f196f9e890a764085ec7e05ca90d12abc16e4 Mon Sep 17 00:00:00 2001 From: Jeronimo Martinez Date: Mon, 27 Oct 2025 10:32:22 +0100 Subject: [PATCH 1/3] Remove constraint for adding files with field IDs --- pyiceberg/io/pyarrow.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index efeaa4a2c2..748a322dab 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2641,10 +2641,6 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa parquet_metadata = pq.read_metadata(input_stream) arrow_schema = parquet_metadata.schema.to_arrow_schema() - if visit_pyarrow(arrow_schema, _HasIds()): - raise NotImplementedError( - f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" - ) schema = table_metadata.schema() _check_pyarrow_schema_compatible(schema, arrow_schema, format_version=table_metadata.format_version) From 1addf609da501f7240e1e222156429ad7f1e7d29 Mon Sep 17 00:00:00 2001 From: Jeronimo Martinez Date: Mon, 27 Oct 2025 10:35:23 +0100 Subject: [PATCH 2/3] Add constraint to avoid adding files with conflicting field IDs --- mkdocs/docs/api.md | 8 +- pyiceberg/io/pyarrow.py | 38 +++++++- tests/integration/test_add_files.py | 140 ++++++++++++++++++++++++++-- 3 files changed, 176 insertions(+), 10 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 0e0dc375de..b0d14acc29 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -1006,9 +1006,11 @@ Expert Iceberg users may choose to commit existing parquet files to the Iceberg -!!! note "Name Mapping" - Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one. - +!!! note "Name Mapping and Field IDs" + `add_files` can work with Parquet files both with and without field IDs in their metadata: + - **Files with field IDs**: When field IDs are present in the Parquet metadata, they must match the corresponding field IDs in the Iceberg table schema. This is common for files generated by tools like Spark or when using or other libraries with explicit field ID metadata. + - **Files without field IDs**: When field IDs are absent, the table must have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) to map field names to Iceberg field IDs. `add_files` will automatically create a Name Mapping based on the table's current schema if one doesn't already exist. + In both cases, a Name Mapping is created if the table doesn't have one, ensuring compatibility with various readers. !!! note "Partitions" `add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). Please note that if the column statistics of the `PartitionField`'s source column are not present in the parquet metadata, the partition value is inferred as `None`. diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 748a322dab..f3828990eb 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2610,7 +2610,10 @@ def _check_pyarrow_schema_compatible( Raises: ValueError: If the schemas are not compatible. """ + # Check if the PyArrow schema has explicit field IDs + has_field_ids = visit_pyarrow(provided_schema, _HasIds()) name_mapping = requested_schema.name_mapping + try: provided_schema = pyarrow_to_schema( provided_schema, @@ -2624,8 +2627,41 @@ def _check_pyarrow_schema_compatible( ) additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys()) raise ValueError( - f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." + f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. " + "Update the schema first (hint, use union_by_name)." ) from e + + # If the file has explicit field IDs, validate they match the table schema exactly + if has_field_ids: + # Build mappings for both schemas (including nested fields) + requested_id_to_name = requested_schema._lazy_id_to_name + provided_id_to_name = provided_schema._lazy_id_to_name + + # Also build reverse mapping: path -> field_id for the table + requested_name_to_id = {path: field_id for field_id, path in requested_id_to_name.items()} + + # Check that all field paths in the file have matching field IDs in the table + mismatched_fields = [] + for field_id, provided_path in provided_id_to_name.items(): + # Check if this path exists in the table schema + expected_field_id = requested_name_to_id.get(provided_path) + if expected_field_id is None: + # The file has a field path that doesn't exist in the table at all + # This will be caught by _check_schema_compatible later, so skip it here + continue + elif expected_field_id != field_id: + # Same path, different field ID - this is the critical error + mismatched_fields.append( + f"'{provided_path}': table expects field_id={expected_field_id}, file has field_id={field_id}" + ) + + if mismatched_fields: + raise ValueError( + "Field IDs in Parquet file do not match table schema. When field IDs are explicitly set in the " + "Parquet metadata, they must match the Iceberg table schema.\nMismatched fields:\n" + + "\n".join(f" - {field}" for field in mismatched_fields) + ) + _check_schema_compatible(requested_schema, provided_schema) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 47e56be1f3..372353580b 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -47,6 +47,7 @@ LongType, NestedField, StringType, + StructType, TimestampType, TimestamptzType, ) @@ -216,14 +217,14 @@ def test_add_files_to_unpartitioned_table_raises_file_not_found( @pytest.mark.integration -def test_add_files_to_unpartitioned_table_raises_has_field_ids( +def test_add_files_to_unpartitioned_table_with_field_ids( spark: SparkSession, session_catalog: Catalog, format_version: int ) -> None: - identifier = f"default.unpartitioned_raises_field_ids_v{format_version}" + identifier = f"default.unpartitioned_with_field_ids_v{format_version}" tbl = _create_table(session_catalog, identifier, format_version) - file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)] - # write parquet files + file_paths = [f"s3://warehouse/default/unpartitioned_with_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)] + # write parquet files with field IDs matching the table schema for file_path in file_paths: fo = tbl.io.new_output(file_path) with fo.create(overwrite=True) as fos: @@ -231,8 +232,135 @@ def test_add_files_to_unpartitioned_table_raises_has_field_ids( writer.write_table(ARROW_TABLE_WITH_IDS) # add the parquet files as data files - with pytest.raises(NotImplementedError): - tbl.add_files(file_paths=file_paths) + tbl.add_files(file_paths=file_paths) + + # NameMapping should still be set even though files have field IDs + assert tbl.name_mapping() is not None + + # Verify files were added successfully + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert [row.added_data_files_count for row in rows] == [5] + assert [row.existing_data_files_count for row in rows] == [0] + assert [row.deleted_data_files_count for row in rows] == [0] + + # Verify data can be read back correctly + df = spark.table(identifier).toPandas() + assert len(df) == 5 + assert all(df["foo"] == True) # noqa: E712 + assert all(df["bar"] == "bar_string") + assert all(df["baz"] == 123) + assert all(df["qux"] == date(2024, 3, 7)) + + +@pytest.mark.integration +def test_add_files_with_mismatched_field_ids(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.unpartitioned_mismatched_field_ids_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + # Create schema with field IDs that don't match the table schema + # Table has: 1=foo, 2=bar, 3=baz, 4=qux (assigned by catalog) + # This file has: 1=foo, 2=bar, 5=baz, 6=qux (wrong IDs for baz and qux) + mismatched_schema = pa.schema( + [ + pa.field("foo", pa.bool_(), nullable=False, metadata={"PARQUET:field_id": "1"}), + pa.field("bar", pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}), + pa.field("baz", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "5"}), # Wrong: should be 3 + pa.field("qux", pa.date32(), nullable=False, metadata={"PARQUET:field_id": "6"}), # Wrong: should be 4 + ] + ) + + file_path = f"s3://warehouse/default/unpartitioned_mismatched_field_ids/v{format_version}/test.parquet" + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=mismatched_schema) as writer: + writer.write_table(ARROW_TABLE_WITH_IDS) + + # Adding files with mismatched field IDs should fail + with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"): + tbl.add_files(file_paths=[file_path]) + + +@pytest.mark.integration +def test_add_files_with_mismatched_nested_field_ids(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + """Test that files with mismatched nested (struct) field IDs are rejected.""" + identifier = f"default.nested_mismatched_field_ids_v{format_version}" + + # Create a table with a nested struct field + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + nested_schema = Schema( + NestedField(1, "id", IntegerType(), required=False), + NestedField( + 2, + "user", + StructType( + NestedField(3, "name", StringType(), required=False), + NestedField(4, "age", IntegerType(), required=False), + ), + required=False, + ), + schema_id=0, + ) + + tbl = session_catalog.create_table( + identifier=identifier, + schema=nested_schema, + properties={"format-version": str(format_version)}, + ) + + # Create PyArrow schema with MISMATCHED nested field IDs + # The table expects: id=1, user=2, user.name=3, user.age=4 + # This file has: id=1, user=2, user.name=99, user.age=100 (wrong nested IDs) + pa_schema_mismatched = pa.schema( + [ + pa.field("id", pa.int32(), nullable=True, metadata={b"PARQUET:field_id": b"1"}), + pa.field( + "user", + pa.struct( + [ + pa.field("name", pa.string(), nullable=True, metadata={b"PARQUET:field_id": b"99"}), # Wrong! + pa.field("age", pa.int32(), nullable=True, metadata={b"PARQUET:field_id": b"100"}), # Wrong! + ] + ), + nullable=True, + metadata={b"PARQUET:field_id": b"2"}, + ), + ] + ) + + pa_table = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "user": pa.array( + [ + {"name": "Alice", "age": 30}, + {"name": "Bob", "age": 25}, + {"name": "Charlie", "age": 35}, + ], + type=pa_schema_mismatched.field("user").type, + ), + }, + schema=pa_schema_mismatched, + ) + + file_path = f"s3://warehouse/default/nested_mismatched_field_ids/v{format_version}/test.parquet" + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=pa_schema_mismatched) as writer: + writer.write_table(pa_table) + + # Adding files with mismatched nested field IDs should fail + with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"): + tbl.add_files(file_paths=[file_path]) @pytest.mark.integration From 7e3989614da576292fbf5f7ba88ff1914afb120f Mon Sep 17 00:00:00 2001 From: Jeronimo Martinez Date: Tue, 28 Oct 2025 13:33:50 +0100 Subject: [PATCH 3/3] remove name checks and rely only on ids when available to check compatibility --- pyiceberg/io/pyarrow.py | 37 +------ tests/integration/test_add_files.py | 165 ++++++++++------------------ 2 files changed, 60 insertions(+), 142 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f3828990eb..9a89775d82 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2610,8 +2610,6 @@ def _check_pyarrow_schema_compatible( Raises: ValueError: If the schemas are not compatible. """ - # Check if the PyArrow schema has explicit field IDs - has_field_ids = visit_pyarrow(provided_schema, _HasIds()) name_mapping = requested_schema.name_mapping try: @@ -2627,41 +2625,8 @@ def _check_pyarrow_schema_compatible( ) additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys()) raise ValueError( - f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. " - "Update the schema first (hint, use union_by_name)." + f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." ) from e - - # If the file has explicit field IDs, validate they match the table schema exactly - if has_field_ids: - # Build mappings for both schemas (including nested fields) - requested_id_to_name = requested_schema._lazy_id_to_name - provided_id_to_name = provided_schema._lazy_id_to_name - - # Also build reverse mapping: path -> field_id for the table - requested_name_to_id = {path: field_id for field_id, path in requested_id_to_name.items()} - - # Check that all field paths in the file have matching field IDs in the table - mismatched_fields = [] - for field_id, provided_path in provided_id_to_name.items(): - # Check if this path exists in the table schema - expected_field_id = requested_name_to_id.get(provided_path) - if expected_field_id is None: - # The file has a field path that doesn't exist in the table at all - # This will be caught by _check_schema_compatible later, so skip it here - continue - elif expected_field_id != field_id: - # Same path, different field ID - this is the critical error - mismatched_fields.append( - f"'{provided_path}': table expects field_id={expected_field_id}, file has field_id={field_id}" - ) - - if mismatched_fields: - raise ValueError( - "Field IDs in Parquet file do not match table schema. When field IDs are explicitly set in the " - "Parquet metadata, they must match the Iceberg table schema.\nMismatched fields:\n" - + "\n".join(f" - {field}" for field in mismatched_fields) - ) - _check_schema_compatible(requested_schema, provided_schema) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 372353580b..9e04701b47 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -47,7 +47,6 @@ LongType, NestedField, StringType, - StructType, TimestampType, TimestamptzType, ) @@ -258,111 +257,6 @@ def test_add_files_to_unpartitioned_table_with_field_ids( assert all(df["qux"] == date(2024, 3, 7)) -@pytest.mark.integration -def test_add_files_with_mismatched_field_ids(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: - identifier = f"default.unpartitioned_mismatched_field_ids_v{format_version}" - tbl = _create_table(session_catalog, identifier, format_version) - - # Create schema with field IDs that don't match the table schema - # Table has: 1=foo, 2=bar, 3=baz, 4=qux (assigned by catalog) - # This file has: 1=foo, 2=bar, 5=baz, 6=qux (wrong IDs for baz and qux) - mismatched_schema = pa.schema( - [ - pa.field("foo", pa.bool_(), nullable=False, metadata={"PARQUET:field_id": "1"}), - pa.field("bar", pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}), - pa.field("baz", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "5"}), # Wrong: should be 3 - pa.field("qux", pa.date32(), nullable=False, metadata={"PARQUET:field_id": "6"}), # Wrong: should be 4 - ] - ) - - file_path = f"s3://warehouse/default/unpartitioned_mismatched_field_ids/v{format_version}/test.parquet" - fo = tbl.io.new_output(file_path) - with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=mismatched_schema) as writer: - writer.write_table(ARROW_TABLE_WITH_IDS) - - # Adding files with mismatched field IDs should fail - with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"): - tbl.add_files(file_paths=[file_path]) - - -@pytest.mark.integration -def test_add_files_with_mismatched_nested_field_ids(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: - """Test that files with mismatched nested (struct) field IDs are rejected.""" - identifier = f"default.nested_mismatched_field_ids_v{format_version}" - - # Create a table with a nested struct field - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - nested_schema = Schema( - NestedField(1, "id", IntegerType(), required=False), - NestedField( - 2, - "user", - StructType( - NestedField(3, "name", StringType(), required=False), - NestedField(4, "age", IntegerType(), required=False), - ), - required=False, - ), - schema_id=0, - ) - - tbl = session_catalog.create_table( - identifier=identifier, - schema=nested_schema, - properties={"format-version": str(format_version)}, - ) - - # Create PyArrow schema with MISMATCHED nested field IDs - # The table expects: id=1, user=2, user.name=3, user.age=4 - # This file has: id=1, user=2, user.name=99, user.age=100 (wrong nested IDs) - pa_schema_mismatched = pa.schema( - [ - pa.field("id", pa.int32(), nullable=True, metadata={b"PARQUET:field_id": b"1"}), - pa.field( - "user", - pa.struct( - [ - pa.field("name", pa.string(), nullable=True, metadata={b"PARQUET:field_id": b"99"}), # Wrong! - pa.field("age", pa.int32(), nullable=True, metadata={b"PARQUET:field_id": b"100"}), # Wrong! - ] - ), - nullable=True, - metadata={b"PARQUET:field_id": b"2"}, - ), - ] - ) - - pa_table = pa.table( - { - "id": pa.array([1, 2, 3], type=pa.int32()), - "user": pa.array( - [ - {"name": "Alice", "age": 30}, - {"name": "Bob", "age": 25}, - {"name": "Charlie", "age": 35}, - ], - type=pa_schema_mismatched.field("user").type, - ), - }, - schema=pa_schema_mismatched, - ) - - file_path = f"s3://warehouse/default/nested_mismatched_field_ids/v{format_version}/test.parquet" - fo = tbl.io.new_output(file_path) - with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=pa_schema_mismatched) as writer: - writer.write_table(pa_table) - - # Adding files with mismatched nested field IDs should fail - with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"): - tbl.add_files(file_paths=[file_path]) - - @pytest.mark.integration def test_add_files_parallelized(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: from pyiceberg.io.pyarrow import parquet_file_to_data_file @@ -707,6 +601,65 @@ def test_add_files_fails_on_schema_mismatch(spark: SparkSession, session_catalog tbl.add_files(file_paths=[file_path]) +@pytest.mark.integration +def test_add_files_with_field_ids_fails_on_schema_mismatch( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + """Test that files with mismatched field types (when field IDs match) are rejected.""" + identifier = f"default.table_schema_mismatch_based_on_field_ids__fails_v{format_version}" + + tbl = _create_table(session_catalog, identifier, format_version) + + # All fields are renamed and reordered but have matching field IDs, so they should be compatible + # except for 'baz' which has the wrong type + WRONG_SCHEMA = pa.schema( + [ + pa.field("qux_", pa.date32(), metadata={"PARQUET:field_id": "4"}), + pa.field("baz_", pa.string(), metadata={"PARQUET:field_id": "3"}), # Wrong type: should be int32 + pa.field("bar_", pa.string(), metadata={"PARQUET:field_id": "2"}), + pa.field("foo_", pa.bool_(), metadata={"PARQUET:field_id": "1"}), + ] + ) + file_path = f"s3://warehouse/default/table_with_field_ids_schema_mismatch_fails/v{format_version}/test.parquet" + # write parquet files + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=WRONG_SCHEMA) as writer: + writer.write_table( + pa.Table.from_pylist( + [ + { + "qux_": date(2024, 3, 7), + "baz_": "123", + "bar_": "bar_string", + "foo_": True, + }, + { + "qux_": date(2024, 3, 7), + "baz_": "124", + "bar_": "bar_string", + "foo_": True, + }, + ], + schema=WRONG_SCHEMA, + ) + ) + + expected = """Mismatch in fields: +┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +┃ ┃ Table field ┃ Dataframe field ┃ +┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ +│ ✅ │ 1: foo: optional boolean │ 1: foo_: optional boolean │ +│ ✅ │ 2: bar: optional string │ 2: bar_: optional string │ +│ ❌ │ 3: baz: optional int │ 3: baz_: optional string │ +│ ✅ │ 4: qux: optional date │ 4: qux_: optional date │ +└────┴──────────────────────────┴───────────────────────────┘ +""" + + with pytest.raises(ValueError, match=expected): + tbl.add_files(file_paths=[file_path]) + + @pytest.mark.integration def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: identifier = f"default.unpartitioned_with_large_types{format_version}"