Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 21 additions & 16 deletions datafusion/functions-nested/src/make_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,7 @@ impl ScalarUDFImpl for MakeArray {
}

fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) {
return Ok(unified);
}

if let Some(unified) = type_union_resolution(arg_types) {
Ok(vec![unified; arg_types.len()])
} else {
plan_err!(
"Failed to unify argument types of {}: [{}]",
self.name(),
arg_types.iter().join(", ")
)
}
coerce_types_inner(arg_types, self.name())
}

fn documentation(&self) -> Option<&Documentation> {
Expand All @@ -163,7 +151,7 @@ pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
SingleRowListArrayBuilder::new(array).build_list_array(),
))
} else {
array_array::<i32>(arrays, data_type.clone())
array_array::<i32>(arrays, data_type.clone(), Field::LIST_FIELD_DEFAULT_NAME)
}
}

Expand Down Expand Up @@ -207,9 +195,10 @@ pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
/// └──────────────┘ └──────────────┘ └─────────────────────────────┘
/// col1 col2 output
/// ```
fn array_array<O: OffsetSizeTrait>(
pub fn array_array<O: OffsetSizeTrait>(
args: &[ArrayRef],
data_type: DataType,
field_name: &str,
) -> Result<ArrayRef> {
// do not accept 0 arguments.
if args.is_empty() {
Expand Down Expand Up @@ -252,9 +241,25 @@ fn array_array<O: OffsetSizeTrait>(
let data = mutable.freeze();

Ok(Arc::new(GenericListArray::<O>::try_new(
Arc::new(Field::new_list_field(data_type, true)),
Arc::new(Field::new(field_name, data_type, true)),
OffsetBuffer::new(offsets.into()),
arrow::array::make_array(data),
None,
)?))
}

pub fn coerce_types_inner(arg_types: &[DataType], name: &str) -> Result<Vec<DataType>> {
if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) {
return Ok(unified);
}

if let Some(unified) = type_union_resolution(arg_types) {
Ok(vec![unified; arg_types.len()])
} else {
plan_err!(
"Failed to unify argument types of {}: [{}]",
name,
arg_types.iter().join(", ")
)
}
}
1 change: 1 addition & 0 deletions datafusion/spark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true, features = ["crypto_expressions"] }
datafusion-functions-nested = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
sha1 = "0.10"
Expand Down
135 changes: 7 additions & 128 deletions datafusion/spark/src/function/array/spark_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,15 @@

use std::{any::Any, sync::Arc};

use arrow::array::{
make_array, new_null_array, Array, ArrayData, ArrayRef, Capacities, GenericListArray,
MutableArrayData, NullArray, OffsetSizeTrait,
};
use arrow::buffer::OffsetBuffer;
use arrow::array::{new_null_array, Array, ArrayRef};
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::utils::SingleRowListArrayBuilder;
use datafusion_common::{internal_err, plan_datafusion_err, plan_err, Result};
use datafusion_expr::type_coercion::binary::comparison_coercion;
use datafusion_common::{internal_err, Result};
use datafusion_expr::{
ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature,
TypeSignature, Volatility,
};
use datafusion_functions_nested::make_array::{array_array, coerce_types_inner};

use crate::function::functions_nested_utils::make_scalar_function;

Expand All @@ -38,7 +34,6 @@ const ARRAY_FIELD_DEFAULT_NAME: &str = "element";
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SparkArray {
signature: Signature,
aliases: Vec<String>,
}

impl Default for SparkArray {
Expand All @@ -54,7 +49,6 @@ impl SparkArray {
vec![TypeSignature::UserDefined, TypeSignature::Nullary],
Volatility::Immutable,
),
aliases: vec![String::from("spark_make_array")],
}
}
}
Expand Down Expand Up @@ -92,10 +86,6 @@ impl ScalarUDFImpl for SparkArray {
}
}

if expr_type.is_null() {
expr_type = DataType::Int32;
}

let return_type = DataType::List(Arc::new(Field::new(
ARRAY_FIELD_DEFAULT_NAME,
expr_type,
Expand All @@ -115,30 +105,11 @@ impl ScalarUDFImpl for SparkArray {
}

fn aliases(&self) -> &[String] {
&self.aliases
&[]
}

fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
let first_type = arg_types.first().ok_or_else(|| {
plan_datafusion_err!("Spark array function requires at least one argument")
})?;
let new_type =
arg_types
.iter()
.skip(1)
.try_fold(first_type.clone(), |acc, x| {
// The coerced types found by `comparison_coercion` are not guaranteed to be
// coercible for the arguments. `comparison_coercion` returns more loose
// types that can be coerced to both `acc` and `x` for comparison purpose.
// See `maybe_data_types` for the actual coercion.
let coerced_type = comparison_coercion(&acc, x);
if let Some(coerced_type) = coerced_type {
Ok(coerced_type)
} else {
plan_err!("Coercion from {acc} to {x} failed.")
}
})?;
Ok(vec![new_type; arg_types.len()])
coerce_types_inner(arg_types, self.name())
}
}

Expand All @@ -160,106 +131,14 @@ pub fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
DataType::Null => {
let length = arrays.iter().map(|a| a.len()).sum();
// By default Int32
let array = new_null_array(&DataType::Int32, length);
let array = new_null_array(&DataType::Null, length);
Ok(Arc::new(
SingleRowListArrayBuilder::new(array)
.with_nullable(true)
.with_field_name(Some(ARRAY_FIELD_DEFAULT_NAME.to_string()))
.build_list_array(),
))
}
_ => array_array::<i32>(arrays, data_type),
}
}

/// Convert one or more [`ArrayRef`] of the same type into a
/// `ListArray` or 'LargeListArray' depending on the offset size.
///
/// # Example (non nested)
///
/// Calling `array(col1, col2)` where col1 and col2 are non nested
/// would return a single new `ListArray`, where each row was a list
/// of 2 elements:
///
/// ```text
/// ┌─────────┐ ┌─────────┐ ┌──────────────┐
/// │ ┌─────┐ │ │ ┌─────┐ │ │ ┌──────────┐ │
/// │ │ A │ │ │ │ X │ │ │ │ [A, X] │ │
/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │
/// │ │NULL │ │ │ │ Y │ │──────────▶│ │[NULL, Y] │ │
/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │
/// │ │ C │ │ │ │ Z │ │ │ │ [C, Z] │ │
/// │ └─────┘ │ │ └─────┘ │ │ └──────────┘ │
/// └─────────┘ └─────────┘ └──────────────┘
/// col1 col2 output
/// ```
///
/// # Example (nested)
///
/// Calling `array(col1, col2)` where col1 and col2 are lists
/// would return a single new `ListArray`, where each row was a list
/// of the corresponding elements of col1 and col2.
///
/// ``` text
/// ┌──────────────┐ ┌──────────────┐ ┌─────────────────────────────┐
/// │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌────────────────────────┐ │
/// │ │ [A, X] │ │ │ │ [] │ │ │ │ [[A, X], []] │ │
/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────┤ │
/// │ │[NULL, Y] │ │ │ │[Q, R, S] │ │───────▶│ │ [[NULL, Y], [Q, R, S]] │ │
/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────│ │
/// │ │ [C, Z] │ │ │ │ NULL │ │ │ │ [[C, Z], NULL] │ │
/// │ └──────────┘ │ │ └──────────┘ │ │ └────────────────────────┘ │
/// └──────────────┘ └──────────────┘ └─────────────────────────────┘
/// col1 col2 output
/// ```
fn array_array<O: OffsetSizeTrait>(
args: &[ArrayRef],
data_type: DataType,
) -> Result<ArrayRef> {
// do not accept 0 arguments.
if args.is_empty() {
return plan_err!("Array requires at least one argument");
}

let mut data = vec![];
let mut total_len = 0;
for arg in args {
let arg_data = if arg.as_any().is::<NullArray>() {
ArrayData::new_empty(&data_type)
} else {
arg.to_data()
};
total_len += arg_data.len();
data.push(arg_data);
_ => array_array::<i32>(arrays, data_type, ARRAY_FIELD_DEFAULT_NAME),
}

let mut offsets: Vec<O> = Vec::with_capacity(total_len);
offsets.push(O::usize_as(0));

let capacity = Capacities::Array(total_len);
let data_ref = data.iter().collect::<Vec<_>>();
let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity);

let num_rows = args[0].len();
for row_idx in 0..num_rows {
for (arr_idx, arg) in args.iter().enumerate() {
if !arg.as_any().is::<NullArray>()
&& !arg.is_null(row_idx)
&& arg.is_valid(row_idx)
{
mutable.extend(arr_idx, row_idx, row_idx + 1);
} else {
mutable.extend_nulls(1);
}
}
offsets.push(O::usize_as(mutable.len()));
}
let data = mutable.freeze();

Ok(Arc::new(GenericListArray::<O>::try_new(
Arc::new(Field::new(ARRAY_FIELD_DEFAULT_NAME, data_type, true)),
OffsetBuffer::new(offsets.into()),
make_array(data),
None,
)?))
}