Skip to content

Commit 2217c2b

Browse files
authored
chore: merge make_array and spark array (#19006)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> Part of #17964. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Fix return type of spark `array` function when data type is null to be consistent with that returned in Spark. - Reuse functions shared by both `make_array` and spark `array`. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> No
1 parent 09fe307 commit 2217c2b

File tree

4 files changed

+30
-144
lines changed

4 files changed

+30
-144
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/functions-nested/src/make_array.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -125,19 +125,7 @@ impl ScalarUDFImpl for MakeArray {
125125
}
126126

127127
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
128-
if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) {
129-
return Ok(unified);
130-
}
131-
132-
if let Some(unified) = type_union_resolution(arg_types) {
133-
Ok(vec![unified; arg_types.len()])
134-
} else {
135-
plan_err!(
136-
"Failed to unify argument types of {}: [{}]",
137-
self.name(),
138-
arg_types.iter().join(", ")
139-
)
140-
}
128+
coerce_types_inner(arg_types, self.name())
141129
}
142130

143131
fn documentation(&self) -> Option<&Documentation> {
@@ -163,7 +151,7 @@ pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
163151
SingleRowListArrayBuilder::new(array).build_list_array(),
164152
))
165153
} else {
166-
array_array::<i32>(arrays, data_type.clone())
154+
array_array::<i32>(arrays, data_type.clone(), Field::LIST_FIELD_DEFAULT_NAME)
167155
}
168156
}
169157

@@ -207,9 +195,10 @@ pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
207195
/// └──────────────┘ └──────────────┘ └─────────────────────────────┘
208196
/// col1 col2 output
209197
/// ```
210-
fn array_array<O: OffsetSizeTrait>(
198+
pub fn array_array<O: OffsetSizeTrait>(
211199
args: &[ArrayRef],
212200
data_type: DataType,
201+
field_name: &str,
213202
) -> Result<ArrayRef> {
214203
// do not accept 0 arguments.
215204
if args.is_empty() {
@@ -252,9 +241,25 @@ fn array_array<O: OffsetSizeTrait>(
252241
let data = mutable.freeze();
253242

254243
Ok(Arc::new(GenericListArray::<O>::try_new(
255-
Arc::new(Field::new_list_field(data_type, true)),
244+
Arc::new(Field::new(field_name, data_type, true)),
256245
OffsetBuffer::new(offsets.into()),
257246
arrow::array::make_array(data),
258247
None,
259248
)?))
260249
}
250+
251+
pub fn coerce_types_inner(arg_types: &[DataType], name: &str) -> Result<Vec<DataType>> {
252+
if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) {
253+
return Ok(unified);
254+
}
255+
256+
if let Some(unified) = type_union_resolution(arg_types) {
257+
Ok(vec![unified; arg_types.len()])
258+
} else {
259+
plan_err!(
260+
"Failed to unify argument types of {}: [{}]",
261+
name,
262+
arg_types.iter().join(", ")
263+
)
264+
}
265+
}

datafusion/spark/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ datafusion-common = { workspace = true }
4848
datafusion-execution = { workspace = true }
4949
datafusion-expr = { workspace = true }
5050
datafusion-functions = { workspace = true, features = ["crypto_expressions"] }
51+
datafusion-functions-nested = { workspace = true }
5152
log = { workspace = true }
5253
rand = { workspace = true }
5354
sha1 = "0.10"

datafusion/spark/src/function/array/spark_array.rs

Lines changed: 7 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,15 @@
1717

1818
use std::{any::Any, sync::Arc};
1919

20-
use arrow::array::{
21-
make_array, new_null_array, Array, ArrayData, ArrayRef, Capacities, GenericListArray,
22-
MutableArrayData, NullArray, OffsetSizeTrait,
23-
};
24-
use arrow::buffer::OffsetBuffer;
20+
use arrow::array::{new_null_array, Array, ArrayRef};
2521
use arrow::datatypes::{DataType, Field, FieldRef};
2622
use datafusion_common::utils::SingleRowListArrayBuilder;
27-
use datafusion_common::{internal_err, plan_datafusion_err, plan_err, Result};
28-
use datafusion_expr::type_coercion::binary::comparison_coercion;
23+
use datafusion_common::{internal_err, Result};
2924
use datafusion_expr::{
3025
ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature,
3126
TypeSignature, Volatility,
3227
};
28+
use datafusion_functions_nested::make_array::{array_array, coerce_types_inner};
3329

3430
use crate::function::functions_nested_utils::make_scalar_function;
3531

@@ -38,7 +34,6 @@ const ARRAY_FIELD_DEFAULT_NAME: &str = "element";
3834
#[derive(Debug, PartialEq, Eq, Hash)]
3935
pub struct SparkArray {
4036
signature: Signature,
41-
aliases: Vec<String>,
4237
}
4338

4439
impl Default for SparkArray {
@@ -54,7 +49,6 @@ impl SparkArray {
5449
vec![TypeSignature::UserDefined, TypeSignature::Nullary],
5550
Volatility::Immutable,
5651
),
57-
aliases: vec![String::from("spark_make_array")],
5852
}
5953
}
6054
}
@@ -92,10 +86,6 @@ impl ScalarUDFImpl for SparkArray {
9286
}
9387
}
9488

95-
if expr_type.is_null() {
96-
expr_type = DataType::Int32;
97-
}
98-
9989
let return_type = DataType::List(Arc::new(Field::new(
10090
ARRAY_FIELD_DEFAULT_NAME,
10191
expr_type,
@@ -115,30 +105,11 @@ impl ScalarUDFImpl for SparkArray {
115105
}
116106

117107
fn aliases(&self) -> &[String] {
118-
&self.aliases
108+
&[]
119109
}
120110

121111
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
122-
let first_type = arg_types.first().ok_or_else(|| {
123-
plan_datafusion_err!("Spark array function requires at least one argument")
124-
})?;
125-
let new_type =
126-
arg_types
127-
.iter()
128-
.skip(1)
129-
.try_fold(first_type.clone(), |acc, x| {
130-
// The coerced types found by `comparison_coercion` are not guaranteed to be
131-
// coercible for the arguments. `comparison_coercion` returns more loose
132-
// types that can be coerced to both `acc` and `x` for comparison purpose.
133-
// See `maybe_data_types` for the actual coercion.
134-
let coerced_type = comparison_coercion(&acc, x);
135-
if let Some(coerced_type) = coerced_type {
136-
Ok(coerced_type)
137-
} else {
138-
plan_err!("Coercion from {acc} to {x} failed.")
139-
}
140-
})?;
141-
Ok(vec![new_type; arg_types.len()])
112+
coerce_types_inner(arg_types, self.name())
142113
}
143114
}
144115

@@ -160,106 +131,14 @@ pub fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
160131
DataType::Null => {
161132
let length = arrays.iter().map(|a| a.len()).sum();
162133
// By default Int32
163-
let array = new_null_array(&DataType::Int32, length);
134+
let array = new_null_array(&DataType::Null, length);
164135
Ok(Arc::new(
165136
SingleRowListArrayBuilder::new(array)
166137
.with_nullable(true)
167138
.with_field_name(Some(ARRAY_FIELD_DEFAULT_NAME.to_string()))
168139
.build_list_array(),
169140
))
170141
}
171-
_ => array_array::<i32>(arrays, data_type),
172-
}
173-
}
174-
175-
/// Convert one or more [`ArrayRef`] of the same type into a
176-
/// `ListArray` or 'LargeListArray' depending on the offset size.
177-
///
178-
/// # Example (non nested)
179-
///
180-
/// Calling `array(col1, col2)` where col1 and col2 are non nested
181-
/// would return a single new `ListArray`, where each row was a list
182-
/// of 2 elements:
183-
///
184-
/// ```text
185-
/// ┌─────────┐ ┌─────────┐ ┌──────────────┐
186-
/// │ ┌─────┐ │ │ ┌─────┐ │ │ ┌──────────┐ │
187-
/// │ │ A │ │ │ │ X │ │ │ │ [A, X] │ │
188-
/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │
189-
/// │ │NULL │ │ │ │ Y │ │──────────▶│ │[NULL, Y] │ │
190-
/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │
191-
/// │ │ C │ │ │ │ Z │ │ │ │ [C, Z] │ │
192-
/// │ └─────┘ │ │ └─────┘ │ │ └──────────┘ │
193-
/// └─────────┘ └─────────┘ └──────────────┘
194-
/// col1 col2 output
195-
/// ```
196-
///
197-
/// # Example (nested)
198-
///
199-
/// Calling `array(col1, col2)` where col1 and col2 are lists
200-
/// would return a single new `ListArray`, where each row was a list
201-
/// of the corresponding elements of col1 and col2.
202-
///
203-
/// ``` text
204-
/// ┌──────────────┐ ┌──────────────┐ ┌─────────────────────────────┐
205-
/// │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌────────────────────────┐ │
206-
/// │ │ [A, X] │ │ │ │ [] │ │ │ │ [[A, X], []] │ │
207-
/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────┤ │
208-
/// │ │[NULL, Y] │ │ │ │[Q, R, S] │ │───────▶│ │ [[NULL, Y], [Q, R, S]] │ │
209-
/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────│ │
210-
/// │ │ [C, Z] │ │ │ │ NULL │ │ │ │ [[C, Z], NULL] │ │
211-
/// │ └──────────┘ │ │ └──────────┘ │ │ └────────────────────────┘ │
212-
/// └──────────────┘ └──────────────┘ └─────────────────────────────┘
213-
/// col1 col2 output
214-
/// ```
215-
fn array_array<O: OffsetSizeTrait>(
216-
args: &[ArrayRef],
217-
data_type: DataType,
218-
) -> Result<ArrayRef> {
219-
// do not accept 0 arguments.
220-
if args.is_empty() {
221-
return plan_err!("Array requires at least one argument");
222-
}
223-
224-
let mut data = vec![];
225-
let mut total_len = 0;
226-
for arg in args {
227-
let arg_data = if arg.as_any().is::<NullArray>() {
228-
ArrayData::new_empty(&data_type)
229-
} else {
230-
arg.to_data()
231-
};
232-
total_len += arg_data.len();
233-
data.push(arg_data);
142+
_ => array_array::<i32>(arrays, data_type, ARRAY_FIELD_DEFAULT_NAME),
234143
}
235-
236-
let mut offsets: Vec<O> = Vec::with_capacity(total_len);
237-
offsets.push(O::usize_as(0));
238-
239-
let capacity = Capacities::Array(total_len);
240-
let data_ref = data.iter().collect::<Vec<_>>();
241-
let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity);
242-
243-
let num_rows = args[0].len();
244-
for row_idx in 0..num_rows {
245-
for (arr_idx, arg) in args.iter().enumerate() {
246-
if !arg.as_any().is::<NullArray>()
247-
&& !arg.is_null(row_idx)
248-
&& arg.is_valid(row_idx)
249-
{
250-
mutable.extend(arr_idx, row_idx, row_idx + 1);
251-
} else {
252-
mutable.extend_nulls(1);
253-
}
254-
}
255-
offsets.push(O::usize_as(mutable.len()));
256-
}
257-
let data = mutable.freeze();
258-
259-
Ok(Arc::new(GenericListArray::<O>::try_new(
260-
Arc::new(Field::new(ARRAY_FIELD_DEFAULT_NAME, data_type, true)),
261-
OffsetBuffer::new(offsets.into()),
262-
make_array(data),
263-
None,
264-
)?))
265144
}

0 commit comments

Comments
 (0)