Skip to content

Commit ca2c48d

Browse files
committed
feat: support null_treatment for aggegrate udf
1 parent 474fbe6 commit ca2c48d

File tree

5 files changed

+36
-2
lines changed

5 files changed

+36
-2
lines changed

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,7 @@ message AggregateUDFExprNode {
518518
LogicalExprNode filter = 3;
519519
repeated SortExprNode order_by = 4;
520520
optional bytes fun_definition = 6;
521+
NullTreatment null_treatment = 7;
521522
}
522523

523524
message ScalarUDFExprNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/logical_plan/from_proto.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,14 +587,23 @@ pub fn parse_expr(
587587
.udaf(&pb.fun_name)
588588
.or_else(|_| codec.try_decode_udaf(&pb.fun_name, &[]))?,
589589
};
590+
let null_treatment: Option<NullTreatment> =
591+
protobuf::NullTreatment::try_from(pb.null_treatment)
592+
.map_err(|_| {
593+
proto_error(format!(
594+
"Received a AggregateUdfExprNode message with unknown NullTreatment {}",
595+
pb.null_treatment
596+
))
597+
})?
598+
.into();
590599

591600
Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
592601
agg_fn,
593602
parse_exprs(&pb.args, registry, codec)?,
594603
pb.distinct,
595604
parse_optional_expr(pb.filter.as_deref(), registry, codec)?.map(Box::new),
596605
parse_sorts(&pb.order_by, registry, codec)?,
597-
None,
606+
null_treatment,
598607
)))
599608
}
600609

datafusion/proto/src/logical_plan/to_proto.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ pub fn serialize_expr(
368368
ref distinct,
369369
ref filter,
370370
ref order_by,
371-
null_treatment: _,
371+
ref null_treatment,
372372
},
373373
}) => {
374374
let mut buf = Vec::new();
@@ -385,6 +385,8 @@ pub fn serialize_expr(
385385
},
386386
order_by: serialize_sorts(order_by, codec)?,
387387
fun_definition: (!buf.is_empty()).then_some(buf),
388+
null_treatment: protobuf::NullTreatment::from(null_treatment)
389+
.into(),
388390
},
389391
))),
390392
}

0 commit comments

Comments
 (0)