Skip to content

Commit 1fcad93

Browse files
authored
feat(datafusion): Support INSERT INTO partitioned tables (#1827)
## Which issue does this PR close? - Closes #1828 - Related to #1540 ## What changes are included in this PR? - Use project to calculate partition values for record batches - Repartition inputs for table_provider::insert_into - Initialize partition_splitter in TaskWriter's constructor - Use TaskWriter in `IcebergWriteExec` to support partitioned data ## Are these changes tested? Added an ut
1 parent 776701c commit 1fcad93

File tree

6 files changed

+239
-96
lines changed

6 files changed

+239
-96
lines changed

crates/iceberg/src/arrow/record_batch_partition_splitter.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl RecordBatchPartitionSplitter {
5959
/// # Returns
6060
///
6161
/// Returns a new `RecordBatchPartitionSplitter` instance or an error if initialization fails.
62-
pub fn new(
62+
pub fn try_new(
6363
iceberg_schema: SchemaRef,
6464
partition_spec: PartitionSpecRef,
6565
calculator: Option<PartitionValueCalculator>,
@@ -87,12 +87,12 @@ impl RecordBatchPartitionSplitter {
8787
/// # Returns
8888
///
8989
/// Returns a new `RecordBatchPartitionSplitter` instance or an error if initialization fails.
90-
pub fn new_with_computed_values(
90+
pub fn try_new_with_computed_values(
9191
iceberg_schema: SchemaRef,
9292
partition_spec: PartitionSpecRef,
9393
) -> Result<Self> {
9494
let calculator = PartitionValueCalculator::try_new(&partition_spec, &iceberg_schema)?;
95-
Self::new(iceberg_schema, partition_spec, Some(calculator))
95+
Self::try_new(iceberg_schema, partition_spec, Some(calculator))
9696
}
9797

9898
/// Create a new RecordBatchPartitionSplitter expecting pre-computed partition values.
@@ -108,11 +108,11 @@ impl RecordBatchPartitionSplitter {
108108
/// # Returns
109109
///
110110
/// Returns a new `RecordBatchPartitionSplitter` instance or an error if initialization fails.
111-
pub fn new_with_precomputed_values(
111+
pub fn try_new_with_precomputed_values(
112112
iceberg_schema: SchemaRef,
113113
partition_spec: PartitionSpecRef,
114114
) -> Result<Self> {
115-
Self::new(iceberg_schema, partition_spec, None)
115+
Self::try_new(iceberg_schema, partition_spec, None)
116116
}
117117

118118
/// Split the record batch into multiple record batches based on the partition spec.
@@ -261,9 +261,11 @@ mod tests {
261261
.build()
262262
.unwrap(),
263263
);
264-
let partition_splitter =
265-
RecordBatchPartitionSplitter::new_with_computed_values(schema.clone(), partition_spec)
266-
.expect("Failed to create splitter");
264+
let partition_splitter = RecordBatchPartitionSplitter::try_new_with_computed_values(
265+
schema.clone(),
266+
partition_spec,
267+
)
268+
.expect("Failed to create splitter");
267269

268270
let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
269271
let id_array = Int32Array::from(vec![1, 2, 1, 3, 2, 3, 1]);
@@ -392,7 +394,7 @@ mod tests {
392394
]));
393395

394396
// Create splitter expecting pre-computed partition column
395-
let partition_splitter = RecordBatchPartitionSplitter::new_with_precomputed_values(
397+
let partition_splitter = RecordBatchPartitionSplitter::try_new_with_precomputed_values(
396398
schema.clone(),
397399
partition_spec,
398400
)

crates/integrations/datafusion/src/physical_plan/repartition.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ use iceberg::spec::{TableMetadata, TableMetadataRef, Transform};
8686
/// NonZeroUsize::new(4).unwrap(),
8787
/// )?;
8888
/// ```
89-
#[allow(dead_code)]
9089
pub(crate) fn repartition(
9190
input: Arc<dyn ExecutionPlan>,
9291
table_metadata: TableMetadataRef,

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use datafusion::physical_plan::{
3535
execute_input_stream,
3636
};
3737
use futures::StreamExt;
38-
use iceberg::arrow::{FieldMatchMode, schema_to_arrow_schema};
38+
use iceberg::arrow::FieldMatchMode;
3939
use iceberg::spec::{DataFileFormat, TableProperties, serialize_data_file_to_json};
4040
use iceberg::table::Table;
4141
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
@@ -44,12 +44,12 @@ use iceberg::writer::file_writer::location_generator::{
4444
DefaultFileNameGenerator, DefaultLocationGenerator,
4545
};
4646
use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
47-
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
4847
use iceberg::{Error, ErrorKind};
4948
use parquet::file::properties::WriterProperties;
5049
use uuid::Uuid;
5150

5251
use crate::physical_plan::DATA_FILES_COL_NAME;
52+
use crate::task_writer::TaskWriter;
5353
use crate::to_datafusion_error;
5454

5555
/// An execution plan node that writes data to an Iceberg table.
@@ -205,18 +205,6 @@ impl ExecutionPlan for IcebergWriteExec {
205205
partition: usize,
206206
context: Arc<TaskContext>,
207207
) -> DFResult<SendableRecordBatchStream> {
208-
if !self
209-
.table
210-
.metadata()
211-
.default_partition_spec()
212-
.is_unpartitioned()
213-
{
214-
// TODO add support for partitioned tables
215-
return Err(DataFusionError::NotImplemented(
216-
"IcebergWriteExec does not support partitioned tables yet".to_string(),
217-
));
218-
}
219-
220208
let partition_type = self.table.metadata().default_partition_type().clone();
221209
let format_version = self.table.metadata().format_version();
222210

@@ -277,31 +265,41 @@ impl ExecutionPlan for IcebergWriteExec {
277265
);
278266
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
279267

268+
// Create TaskWriter
269+
// TODO: Make fanout_enabled configurable via table properties
270+
let fanout_enabled = true;
271+
let schema = self.table.metadata().current_schema().clone();
272+
let partition_spec = self.table.metadata().default_partition_spec().clone();
273+
let task_writer = TaskWriter::try_new(
274+
data_file_writer_builder,
275+
fanout_enabled,
276+
schema.clone(),
277+
partition_spec,
278+
)
279+
.map_err(to_datafusion_error)?;
280+
280281
// Get input data
281282
let data = execute_input_stream(
282283
Arc::clone(&self.input),
283-
Arc::new(
284-
schema_to_arrow_schema(self.table.metadata().current_schema())
285-
.map_err(to_datafusion_error)?,
286-
),
284+
self.input.schema(), // input schema may have projected column `_partition`
287285
partition,
288286
Arc::clone(&context),
289287
)?;
290288

291289
// Create write stream
292290
let stream = futures::stream::once(async move {
293-
let mut writer = data_file_writer_builder
294-
// todo specify partition key when partitioning writer is supported
295-
.build(None)
296-
.await
297-
.map_err(to_datafusion_error)?;
291+
let mut task_writer = task_writer;
298292
let mut input_stream = data;
299293

300294
while let Some(batch) = input_stream.next().await {
301-
writer.write(batch?).await.map_err(to_datafusion_error)?;
295+
let batch = batch?;
296+
task_writer
297+
.write(batch)
298+
.await
299+
.map_err(to_datafusion_error)?;
302300
}
303301

304-
let data_files = writer.close().await.map_err(to_datafusion_error)?;
302+
let data_files = task_writer.close().await.map_err(to_datafusion_error)?;
305303

306304
// Convert builders to data files and then to JSON strings
307305
let data_files_strs: Vec<String> = data_files

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub mod metadata_table;
1919
pub mod table_provider_factory;
2020

2121
use std::any::Any;
22+
use std::num::NonZeroUsize;
2223
use std::sync::Arc;
2324

2425
use async_trait::async_trait;
@@ -38,6 +39,8 @@ use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
3839
use metadata_table::IcebergMetadataTableProvider;
3940

4041
use crate::physical_plan::commit::IcebergCommitExec;
42+
use crate::physical_plan::project::project_with_partition;
43+
use crate::physical_plan::repartition::repartition;
4144
use crate::physical_plan::scan::IcebergTableScan;
4245
use crate::physical_plan::write::IcebergWriteExec;
4346

@@ -170,32 +173,42 @@ impl TableProvider for IcebergTableProvider {
170173

171174
async fn insert_into(
172175
&self,
173-
_state: &dyn Session,
176+
state: &dyn Session,
174177
input: Arc<dyn ExecutionPlan>,
175178
_insert_op: InsertOp,
176179
) -> DFResult<Arc<dyn ExecutionPlan>> {
177-
if !self
178-
.table
179-
.metadata()
180-
.default_partition_spec()
181-
.is_unpartitioned()
182-
{
183-
// TODO add insert into support for partitioned tables
184-
return Err(DataFusionError::NotImplemented(
185-
"IcebergTableProvider::insert_into does not support partitioned tables yet"
186-
.to_string(),
187-
));
188-
}
189-
190180
let Some(catalog) = self.catalog.clone() else {
191181
return Err(DataFusionError::Execution(
192182
"Catalog cannot be none for insert_into".to_string(),
193183
));
194184
};
195185

186+
let partition_spec = self.table.metadata().default_partition_spec();
187+
188+
// Step 1: Project partition values for partitioned tables
189+
let plan_with_partition = if !partition_spec.is_unpartitioned() {
190+
project_with_partition(input, &self.table)?
191+
} else {
192+
input
193+
};
194+
195+
// Step 2: Repartition for parallel processing
196+
let target_partitions =
197+
NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| {
198+
DataFusionError::Configuration(
199+
"target_partitions must be greater than 0".to_string(),
200+
)
201+
})?;
202+
203+
let repartitioned_plan = repartition(
204+
plan_with_partition,
205+
self.table.metadata_ref(),
206+
target_partitions,
207+
)?;
208+
196209
let write_plan = Arc::new(IcebergWriteExec::new(
197210
self.table.clone(),
198-
input,
211+
repartitioned_plan,
199212
self.schema.clone(),
200213
));
201214

0 commit comments

Comments
 (0)