Skip to content

Commit 5fb9b9f

Browse files
committed
chore: merge make_array and spark array
1 parent 96a5f21 commit 5fb9b9f

File tree

4 files changed

+30
-142
lines changed

4 files changed

+30
-142
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/functions-nested/src/make_array.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -125,19 +125,7 @@ impl ScalarUDFImpl for MakeArray {
125125
}
126126

127127
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
128-
if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) {
129-
return Ok(unified);
130-
}
131-
132-
if let Some(unified) = type_union_resolution(arg_types) {
133-
Ok(vec![unified; arg_types.len()])
134-
} else {
135-
plan_err!(
136-
"Failed to unify argument types of {}: [{}]",
137-
self.name(),
138-
arg_types.iter().join(", ")
139-
)
140-
}
128+
coerce_types_inner(arg_types, self.name())
141129
}
142130

143131
fn documentation(&self) -> Option<&Documentation> {
@@ -163,7 +151,7 @@ pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
163151
SingleRowListArrayBuilder::new(array).build_list_array(),
164152
))
165153
} else {
166-
array_array::<i32>(arrays, data_type.clone())
154+
array_array::<i32>(arrays, data_type.clone(), Field::LIST_FIELD_DEFAULT_NAME)
167155
}
168156
}
169157

@@ -207,9 +195,10 @@ pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
207195
/// └──────────────┘ └──────────────┘ └─────────────────────────────┘
208196
/// col1 col2 output
209197
/// ```
210-
fn array_array<O: OffsetSizeTrait>(
198+
pub fn array_array<O: OffsetSizeTrait>(
211199
args: &[ArrayRef],
212200
data_type: DataType,
201+
field_name: &str,
213202
) -> Result<ArrayRef> {
214203
// do not accept 0 arguments.
215204
if args.is_empty() {
@@ -252,9 +241,25 @@ fn array_array<O: OffsetSizeTrait>(
252241
let data = mutable.freeze();
253242

254243
Ok(Arc::new(GenericListArray::<O>::try_new(
255-
Arc::new(Field::new_list_field(data_type, true)),
244+
Arc::new(Field::new(field_name, data_type, true)),
256245
OffsetBuffer::new(offsets.into()),
257246
arrow::array::make_array(data),
258247
None,
259248
)?))
260249
}
250+
251+
pub fn coerce_types_inner(arg_types: &[DataType], name: &str) -> Result<Vec<DataType>> {
252+
if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) {
253+
return Ok(unified);
254+
}
255+
256+
if let Some(unified) = type_union_resolution(arg_types) {
257+
Ok(vec![unified; arg_types.len()])
258+
} else {
259+
plan_err!(
260+
"Failed to unify argument types of {}: [{}]",
261+
name,
262+
arg_types.iter().join(", ")
263+
)
264+
}
265+
}

datafusion/spark/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ datafusion-common = { workspace = true }
4848
datafusion-execution = { workspace = true }
4949
datafusion-expr = { workspace = true }
5050
datafusion-functions = { workspace = true, features = ["crypto_expressions"] }
51+
datafusion-functions-nested = { workspace = true }
5152
log = { workspace = true }
5253
rand = { workspace = true }
5354
sha1 = "0.10"

datafusion/spark/src/function/array/spark_array.rs

Lines changed: 7 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,15 @@
1717

1818
use std::{any::Any, sync::Arc};
1919

20-
use arrow::array::{
21-
make_array, new_null_array, Array, ArrayData, ArrayRef, Capacities, GenericListArray,
22-
MutableArrayData, NullArray, OffsetSizeTrait,
23-
};
24-
use arrow::buffer::OffsetBuffer;
20+
use arrow::array::{new_null_array, Array, ArrayRef};
2521
use arrow::datatypes::{DataType, Field, FieldRef};
2622
use datafusion_common::utils::SingleRowListArrayBuilder;
27-
use datafusion_common::{internal_err, plan_datafusion_err, plan_err, Result};
28-
use datafusion_expr::type_coercion::binary::comparison_coercion;
23+
use datafusion_common::{internal_err, Result};
2924
use datafusion_expr::{
3025
ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature,
3126
TypeSignature, Volatility,
3227
};
28+
use datafusion_functions_nested::make_array::{array_array, coerce_types_inner};
3329

3430
use crate::function::functions_nested_utils::make_scalar_function;
3531

@@ -54,7 +50,7 @@ impl SparkArray {
5450
vec![TypeSignature::UserDefined, TypeSignature::Nullary],
5551
Volatility::Immutable,
5652
),
57-
aliases: vec![String::from("spark_make_array")],
53+
aliases: vec![],
5854
}
5955
}
6056
}
@@ -92,10 +88,6 @@ impl ScalarUDFImpl for SparkArray {
9288
}
9389
}
9490

95-
if expr_type.is_null() {
96-
expr_type = DataType::Int32;
97-
}
98-
9991
let return_type = DataType::List(Arc::new(Field::new(
10092
ARRAY_FIELD_DEFAULT_NAME,
10193
expr_type,
@@ -119,26 +111,7 @@ impl ScalarUDFImpl for SparkArray {
119111
}
120112

121113
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
122-
let first_type = arg_types.first().ok_or_else(|| {
123-
plan_datafusion_err!("Spark array function requires at least one argument")
124-
})?;
125-
let new_type =
126-
arg_types
127-
.iter()
128-
.skip(1)
129-
.try_fold(first_type.clone(), |acc, x| {
130-
// The coerced types found by `comparison_coercion` are not guaranteed to be
131-
// coercible for the arguments. `comparison_coercion` returns more loose
132-
// types that can be coerced to both `acc` and `x` for comparison purpose.
133-
// See `maybe_data_types` for the actual coercion.
134-
let coerced_type = comparison_coercion(&acc, x);
135-
if let Some(coerced_type) = coerced_type {
136-
Ok(coerced_type)
137-
} else {
138-
plan_err!("Coercion from {acc} to {x} failed.")
139-
}
140-
})?;
141-
Ok(vec![new_type; arg_types.len()])
114+
coerce_types_inner(arg_types, self.name())
142115
}
143116
}
144117

@@ -160,106 +133,14 @@ pub fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
160133
DataType::Null => {
161134
let length = arrays.iter().map(|a| a.len()).sum();
162135
// By default Int32
163-
let array = new_null_array(&DataType::Int32, length);
136+
let array = new_null_array(&DataType::Null, length);
164137
Ok(Arc::new(
165138
SingleRowListArrayBuilder::new(array)
166139
.with_nullable(true)
167140
.with_field_name(Some(ARRAY_FIELD_DEFAULT_NAME.to_string()))
168141
.build_list_array(),
169142
))
170143
}
171-
_ => array_array::<i32>(arrays, data_type),
172-
}
173-
}
174-
175-
/// Convert one or more [`ArrayRef`] of the same type into a
176-
/// `ListArray` or 'LargeListArray' depending on the offset size.
177-
///
178-
/// # Example (non nested)
179-
///
180-
/// Calling `array(col1, col2)` where col1 and col2 are non nested
181-
/// would return a single new `ListArray`, where each row was a list
182-
/// of 2 elements:
183-
///
184-
/// ```text
185-
/// ┌─────────┐ ┌─────────┐ ┌──────────────┐
186-
/// │ ┌─────┐ │ │ ┌─────┐ │ │ ┌──────────┐ │
187-
/// │ │ A │ │ │ │ X │ │ │ │ [A, X] │ │
188-
/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │
189-
/// │ │NULL │ │ │ │ Y │ │──────────▶│ │[NULL, Y] │ │
190-
/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │
191-
/// │ │ C │ │ │ │ Z │ │ │ │ [C, Z] │ │
192-
/// │ └─────┘ │ │ └─────┘ │ │ └──────────┘ │
193-
/// └─────────┘ └─────────┘ └──────────────┘
194-
/// col1 col2 output
195-
/// ```
196-
///
197-
/// # Example (nested)
198-
///
199-
/// Calling `array(col1, col2)` where col1 and col2 are lists
200-
/// would return a single new `ListArray`, where each row was a list
201-
/// of the corresponding elements of col1 and col2.
202-
///
203-
/// ``` text
204-
/// ┌──────────────┐ ┌──────────────┐ ┌─────────────────────────────┐
205-
/// │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌────────────────────────┐ │
206-
/// │ │ [A, X] │ │ │ │ [] │ │ │ │ [[A, X], []] │ │
207-
/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────┤ │
208-
/// │ │[NULL, Y] │ │ │ │[Q, R, S] │ │───────▶│ │ [[NULL, Y], [Q, R, S]] │ │
209-
/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────│ │
210-
/// │ │ [C, Z] │ │ │ │ NULL │ │ │ │ [[C, Z], NULL] │ │
211-
/// │ └──────────┘ │ │ └──────────┘ │ │ └────────────────────────┘ │
212-
/// └──────────────┘ └──────────────┘ └─────────────────────────────┘
213-
/// col1 col2 output
214-
/// ```
215-
fn array_array<O: OffsetSizeTrait>(
216-
args: &[ArrayRef],
217-
data_type: DataType,
218-
) -> Result<ArrayRef> {
219-
// do not accept 0 arguments.
220-
if args.is_empty() {
221-
return plan_err!("Array requires at least one argument");
222-
}
223-
224-
let mut data = vec![];
225-
let mut total_len = 0;
226-
for arg in args {
227-
let arg_data = if arg.as_any().is::<NullArray>() {
228-
ArrayData::new_empty(&data_type)
229-
} else {
230-
arg.to_data()
231-
};
232-
total_len += arg_data.len();
233-
data.push(arg_data);
144+
_ => array_array::<i32>(arrays, data_type, ARRAY_FIELD_DEFAULT_NAME),
234145
}
235-
236-
let mut offsets: Vec<O> = Vec::with_capacity(total_len);
237-
offsets.push(O::usize_as(0));
238-
239-
let capacity = Capacities::Array(total_len);
240-
let data_ref = data.iter().collect::<Vec<_>>();
241-
let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity);
242-
243-
let num_rows = args[0].len();
244-
for row_idx in 0..num_rows {
245-
for (arr_idx, arg) in args.iter().enumerate() {
246-
if !arg.as_any().is::<NullArray>()
247-
&& !arg.is_null(row_idx)
248-
&& arg.is_valid(row_idx)
249-
{
250-
mutable.extend(arr_idx, row_idx, row_idx + 1);
251-
} else {
252-
mutable.extend_nulls(1);
253-
}
254-
}
255-
offsets.push(O::usize_as(mutable.len()));
256-
}
257-
let data = mutable.freeze();
258-
259-
Ok(Arc::new(GenericListArray::<O>::try_new(
260-
Arc::new(Field::new(ARRAY_FIELD_DEFAULT_NAME, data_type, true)),
261-
OffsetBuffer::new(offsets.into()),
262-
make_array(data),
263-
None,
264-
)?))
265146
}

0 commit comments

Comments
 (0)