Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1006,9 +1006,11 @@ Expert Iceberg users may choose to commit existing parquet files to the Iceberg

<!-- prettier-ignore-start -->

!!! note "Name Mapping"
Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one.

!!! note "Name Mapping and Field IDs"
`add_files` can work with Parquet files both with and without field IDs in their metadata:
- **Files with field IDs**: When field IDs are present in the Parquet metadata, they must match the corresponding field IDs in the Iceberg table schema. This is common for files generated by tools like Spark or when using or other libraries with explicit field ID metadata.
- **Files without field IDs**: When field IDs are absent, the table must have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) to map field names to Iceberg field IDs. `add_files` will automatically create a Name Mapping based on the table's current schema if one doesn't already exist.
In both cases, a Name Mapping is created if the table doesn't have one, ensuring compatibility with various readers.
!!! note "Partitions"
`add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). Please note that if the column statistics of the `PartitionField`'s source column are not present in the parquet metadata, the partition value is inferred as `None`.

Expand Down
5 changes: 1 addition & 4 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2611,6 +2611,7 @@ def _check_pyarrow_schema_compatible(
ValueError: If the schemas are not compatible.
"""
name_mapping = requested_schema.name_mapping

try:
provided_schema = pyarrow_to_schema(
provided_schema,
Expand Down Expand Up @@ -2641,10 +2642,6 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa
parquet_metadata = pq.read_metadata(input_stream)

arrow_schema = parquet_metadata.schema.to_arrow_schema()
if visit_pyarrow(arrow_schema, _HasIds()):
raise NotImplementedError(
f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
)

schema = table_metadata.schema()
_check_pyarrow_schema_compatible(schema, arrow_schema, format_version=table_metadata.format_version)
Expand Down
93 changes: 87 additions & 6 deletions tests/integration/test_add_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,23 +216,45 @@ def test_add_files_to_unpartitioned_table_raises_file_not_found(


@pytest.mark.integration
def test_add_files_to_unpartitioned_table_raises_has_field_ids(
def test_add_files_to_unpartitioned_table_with_field_ids(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
identifier = f"default.unpartitioned_raises_field_ids_v{format_version}"
identifier = f"default.unpartitioned_with_field_ids_v{format_version}"
tbl = _create_table(session_catalog, identifier, format_version)

file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)]
# write parquet files
file_paths = [f"s3://warehouse/default/unpartitioned_with_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)]
# write parquet files with field IDs matching the table schema
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_WITH_IDS) as writer:
writer.write_table(ARROW_TABLE_WITH_IDS)

# add the parquet files as data files
with pytest.raises(NotImplementedError):
tbl.add_files(file_paths=file_paths)
tbl.add_files(file_paths=file_paths)

# NameMapping should still be set even though files have field IDs
assert tbl.name_mapping() is not None

# Verify files were added successfully
rows = spark.sql(
f"""
SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count
FROM {identifier}.all_manifests
"""
).collect()

assert [row.added_data_files_count for row in rows] == [5]
assert [row.existing_data_files_count for row in rows] == [0]
assert [row.deleted_data_files_count for row in rows] == [0]

# Verify data can be read back correctly
df = spark.table(identifier).toPandas()
assert len(df) == 5
assert all(df["foo"] == True) # noqa: E712
assert all(df["bar"] == "bar_string")
assert all(df["baz"] == 123)
assert all(df["qux"] == date(2024, 3, 7))


@pytest.mark.integration
Expand Down Expand Up @@ -579,6 +601,65 @@ def test_add_files_fails_on_schema_mismatch(spark: SparkSession, session_catalog
tbl.add_files(file_paths=[file_path])


@pytest.mark.integration
def test_add_files_with_field_ids_fails_on_schema_mismatch(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
"""Test that files with mismatched field types (when field IDs match) are rejected."""
identifier = f"default.table_schema_mismatch_based_on_field_ids__fails_v{format_version}"

tbl = _create_table(session_catalog, identifier, format_version)

# All fields are renamed and reordered but have matching field IDs, so they should be compatible
# except for 'baz' which has the wrong type
WRONG_SCHEMA = pa.schema(
[
pa.field("qux_", pa.date32(), metadata={"PARQUET:field_id": "4"}),
pa.field("baz_", pa.string(), metadata={"PARQUET:field_id": "3"}), # Wrong type: should be int32
pa.field("bar_", pa.string(), metadata={"PARQUET:field_id": "2"}),
pa.field("foo_", pa.bool_(), metadata={"PARQUET:field_id": "1"}),
]
)
file_path = f"s3://warehouse/default/table_with_field_ids_schema_mismatch_fails/v{format_version}/test.parquet"
# write parquet files
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=WRONG_SCHEMA) as writer:
writer.write_table(
pa.Table.from_pylist(
[
{
"qux_": date(2024, 3, 7),
"baz_": "123",
"bar_": "bar_string",
"foo_": True,
},
{
"qux_": date(2024, 3, 7),
"baz_": "124",
"bar_": "bar_string",
"foo_": True,
},
],
schema=WRONG_SCHEMA,
)
)

expected = """Mismatch in fields:
┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ┃ Table field ┃ Dataframe field ┃
┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ ✅ │ 1: foo: optional boolean │ 1: foo_: optional boolean │
│ ✅ │ 2: bar: optional string │ 2: bar_: optional string │
│ ❌ │ 3: baz: optional int │ 3: baz_: optional string │
│ ✅ │ 4: qux: optional date │ 4: qux_: optional date │
└────┴──────────────────────────┴───────────────────────────┘
"""

with pytest.raises(ValueError, match=expected):
tbl.add_files(file_paths=[file_path])


@pytest.mark.integration
def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
identifier = f"default.unpartitioned_with_large_types{format_version}"
Expand Down