|  | 
| 47 | 47 |     LongType, | 
| 48 | 48 |     NestedField, | 
| 49 | 49 |     StringType, | 
|  | 50 | +    StructType, | 
| 50 | 51 |     TimestampType, | 
| 51 | 52 |     TimestamptzType, | 
| 52 | 53 | ) | 
| @@ -216,23 +217,136 @@ def test_add_files_to_unpartitioned_table_raises_file_not_found( | 
| 216 | 217 | 
 | 
| 217 | 218 | 
 | 
| 218 | 219 | @pytest.mark.integration | 
| 219 |  | -def test_add_files_to_unpartitioned_table_raises_has_field_ids( | 
|  | 220 | +def test_add_files_to_unpartitioned_table_with_field_ids( | 
| 220 | 221 |     spark: SparkSession, session_catalog: Catalog, format_version: int | 
| 221 | 222 | ) -> None: | 
| 222 |  | -    identifier = f"default.unpartitioned_raises_field_ids_v{format_version}" | 
|  | 223 | +    identifier = f"default.unpartitioned_with_field_ids_v{format_version}" | 
| 223 | 224 |     tbl = _create_table(session_catalog, identifier, format_version) | 
| 224 | 225 | 
 | 
| 225 |  | -    file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)] | 
| 226 |  | -    # write parquet files | 
|  | 226 | +    file_paths = [ | 
|  | 227 | +        f"s3://warehouse/default/unpartitioned_with_field_ids/v{format_version}/test-{i}.parquet" for i in range(5) | 
|  | 228 | +    ] | 
|  | 229 | +    # write parquet files with field IDs matching the table schema | 
| 227 | 230 |     for file_path in file_paths: | 
| 228 | 231 |         fo = tbl.io.new_output(file_path) | 
| 229 | 232 |         with fo.create(overwrite=True) as fos: | 
| 230 | 233 |             with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_WITH_IDS) as writer: | 
| 231 | 234 |                 writer.write_table(ARROW_TABLE_WITH_IDS) | 
| 232 | 235 | 
 | 
| 233 |  | -    # add the parquet files as data files | 
| 234 |  | -    with pytest.raises(NotImplementedError): | 
| 235 |  | -        tbl.add_files(file_paths=file_paths) | 
|  | 236 | +    # add the parquet files as data files - should succeed now that field IDs are supported | 
|  | 237 | +    tbl.add_files(file_paths=file_paths) | 
|  | 238 | + | 
|  | 239 | +    # NameMapping should still be set even though files have field IDs | 
|  | 240 | +    assert tbl.name_mapping() is not None | 
|  | 241 | + | 
|  | 242 | +    # Verify files were added successfully | 
|  | 243 | +    rows = spark.sql( | 
|  | 244 | +        f""" | 
|  | 245 | +        SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count | 
|  | 246 | +        FROM {identifier}.all_manifests | 
|  | 247 | +    """ | 
|  | 248 | +    ).collect() | 
|  | 249 | + | 
|  | 250 | +    assert [row.added_data_files_count for row in rows] == [5] | 
|  | 251 | +    assert [row.existing_data_files_count for row in rows] == [0] | 
|  | 252 | +    assert [row.deleted_data_files_count for row in rows] == [0] | 
|  | 253 | + | 
|  | 254 | +    # Verify data can be read back correctly | 
|  | 255 | +    df = spark.table(identifier).toPandas() | 
|  | 256 | +    assert len(df) == 5 | 
|  | 257 | +    assert all(df["foo"] == True)  # noqa: E712 | 
|  | 258 | +    assert all(df["bar"] == "bar_string") | 
|  | 259 | +    assert all(df["baz"] == 123) | 
|  | 260 | +    assert all(df["qux"] == date(2024, 3, 7)) | 
|  | 261 | + | 
|  | 262 | + | 
|  | 263 | +@pytest.mark.integration | 
|  | 264 | +def test_add_files_with_mismatched_field_ids( | 
|  | 265 | +    spark: SparkSession, session_catalog: Catalog, format_version: int | 
|  | 266 | +) -> None: | 
|  | 267 | +    identifier = f"default.unpartitioned_mismatched_field_ids_v{format_version}" | 
|  | 268 | +    tbl = _create_table(session_catalog, identifier, format_version) | 
|  | 269 | + | 
|  | 270 | +    # Create schema with field IDs that don't match the table schema | 
|  | 271 | +    # Table has: 1=foo, 2=bar, 3=baz, 4=qux (assigned by catalog) | 
|  | 272 | +    # This file has: 1=foo, 2=bar, 5=baz, 6=qux (wrong IDs for baz and qux) | 
|  | 273 | +    mismatched_schema = pa.schema( | 
|  | 274 | +        [ | 
|  | 275 | +            pa.field("foo", pa.bool_(), nullable=False, metadata={"PARQUET:field_id": "1"}), | 
|  | 276 | +            pa.field("bar", pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}), | 
|  | 277 | +            pa.field("baz", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "5"}),  # Wrong: should be 3 | 
|  | 278 | +            pa.field("qux", pa.date32(), nullable=False, metadata={"PARQUET:field_id": "6"}),  # Wrong: should be 4 | 
|  | 279 | +        ] | 
|  | 280 | +    ) | 
|  | 281 | + | 
|  | 282 | +    file_path = f"s3://warehouse/default/unpartitioned_mismatched_field_ids/v{format_version}/test.parquet" | 
|  | 283 | +    fo = tbl.io.new_output(file_path) | 
|  | 284 | +    with fo.create(overwrite=True) as fos: | 
|  | 285 | +        with pq.ParquetWriter(fos, schema=mismatched_schema) as writer: | 
|  | 286 | +            writer.write_table(ARROW_TABLE_WITH_IDS) | 
|  | 287 | + | 
|  | 288 | +    # Adding files with mismatched field IDs should fail | 
|  | 289 | +    with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"): | 
|  | 290 | +        tbl.add_files(file_paths=[file_path]) | 
|  | 291 | + | 
|  | 292 | + | 
|  | 293 | +@pytest.mark.integration | 
|  | 294 | +def test_add_files_with_mismatched_nested_field_ids( | 
|  | 295 | +    spark: SparkSession, session_catalog: Catalog, format_version: int | 
|  | 296 | +) -> None: | 
|  | 297 | +    """Test that files with mismatched nested (struct) field IDs are rejected.""" | 
|  | 298 | +    identifier = f"default.nested_mismatched_field_ids_v{format_version}" | 
|  | 299 | + | 
|  | 300 | +    # Create a table with a nested struct field | 
|  | 301 | +    try: | 
|  | 302 | +        session_catalog.drop_table(identifier=identifier) | 
|  | 303 | +    except NoSuchTableError: | 
|  | 304 | +        pass | 
|  | 305 | + | 
|  | 306 | +    nested_schema = Schema( | 
|  | 307 | +        NestedField(1, "id", IntegerType(), required=False), | 
|  | 308 | +        NestedField(2, "user", StructType( | 
|  | 309 | +            NestedField(3, "name", StringType(), required=False), | 
|  | 310 | +            NestedField(4, "age", IntegerType(), required=False), | 
|  | 311 | +        ), required=False), | 
|  | 312 | +        schema_id=0 | 
|  | 313 | +    ) | 
|  | 314 | + | 
|  | 315 | +    tbl = session_catalog.create_table( | 
|  | 316 | +        identifier=identifier, | 
|  | 317 | +        schema=nested_schema, | 
|  | 318 | +        properties={"format-version": str(format_version)}, | 
|  | 319 | +    ) | 
|  | 320 | + | 
|  | 321 | +    # Create PyArrow schema with MISMATCHED nested field IDs | 
|  | 322 | +    # The table expects: id=1, user=2, user.name=3, user.age=4 | 
|  | 323 | +    # This file has: id=1, user=2, user.name=99, user.age=100 (wrong nested IDs) | 
|  | 324 | +    pa_schema_mismatched = pa.schema([ | 
|  | 325 | +        pa.field('id', pa.int32(), nullable=True, metadata={b'PARQUET:field_id': b'1'}), | 
|  | 326 | +        pa.field('user', pa.struct([ | 
|  | 327 | +            pa.field('name', pa.string(), nullable=True, metadata={b'PARQUET:field_id': b'99'}),  # Wrong! | 
|  | 328 | +            pa.field('age', pa.int32(), nullable=True, metadata={b'PARQUET:field_id': b'100'}),   # Wrong! | 
|  | 329 | +        ]), nullable=True, metadata={b'PARQUET:field_id': b'2'}), | 
|  | 330 | +    ]) | 
|  | 331 | + | 
|  | 332 | +    pa_table = pa.table({ | 
|  | 333 | +        'id': pa.array([1, 2, 3], type=pa.int32()), | 
|  | 334 | +        'user': pa.array([ | 
|  | 335 | +            {'name': 'Alice', 'age': 30}, | 
|  | 336 | +            {'name': 'Bob', 'age': 25}, | 
|  | 337 | +            {'name': 'Charlie', 'age': 35}, | 
|  | 338 | +        ], type=pa_schema_mismatched.field('user').type), | 
|  | 339 | +    }, schema=pa_schema_mismatched) | 
|  | 340 | + | 
|  | 341 | +    file_path = f"s3://warehouse/default/nested_mismatched_field_ids/v{format_version}/test.parquet" | 
|  | 342 | +    fo = tbl.io.new_output(file_path) | 
|  | 343 | +    with fo.create(overwrite=True) as fos: | 
|  | 344 | +        with pq.ParquetWriter(fos, schema=pa_schema_mismatched) as writer: | 
|  | 345 | +            writer.write_table(pa_table) | 
|  | 346 | + | 
|  | 347 | +    # Adding files with mismatched nested field IDs should fail | 
|  | 348 | +    with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"): | 
|  | 349 | +        tbl.add_files(file_paths=[file_path]) | 
| 236 | 350 | 
 | 
| 237 | 351 | 
 | 
| 238 | 352 | @pytest.mark.integration | 
|  | 
0 commit comments