Skip to content

Commit 346c94c

Browse files
authored
chore: refine UDF transport error messaging (#18910)
* refine udf transport error messaging * tests: use runtime spawn in udf transport test * fix: align udf schema mismatch messaging
1 parent a6ac9a5 commit 346c94c

File tree

4 files changed

+472
-8
lines changed

4 files changed

+472
-8
lines changed

src/query/expression/src/utils/udf_client.rs

Lines changed: 175 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::error::Error as StdError;
1516
use std::str::FromStr;
1617
use std::sync::Arc;
1718
use std::time::Duration;
@@ -20,8 +21,10 @@ use std::time::Instant;
2021
use arrow_array::RecordBatch;
2122
use arrow_flight::decode::FlightRecordBatchStream;
2223
use arrow_flight::encode::FlightDataEncoderBuilder;
24+
use arrow_flight::error::FlightError;
2325
use arrow_flight::flight_service_client::FlightServiceClient;
2426
use arrow_flight::FlightDescriptor;
27+
use arrow_schema::ArrowError;
2528
use arrow_select::concat::concat_batches;
2629
use databend_common_base::headers::HEADER_FUNCTION;
2730
use databend_common_base::headers::HEADER_FUNCTION_HANDLER;
@@ -49,6 +52,7 @@ use tonic::transport::channel::Channel;
4952
use tonic::transport::ClientTlsConfig;
5053
use tonic::transport::Endpoint;
5154
use tonic::Request;
55+
use tonic::Status;
5256

5357
use crate::types::DataType;
5458
use crate::variant_transform::contains_variant;
@@ -64,6 +68,26 @@ const UDF_KEEP_ALIVE_TIMEOUT_SEC: u64 = 20;
6468
// 4MB by default, we use 16G
6569
// max_encoding_message_size is usize::max by default
6670
const MAX_DECODING_MESSAGE_SIZE: usize = 16 * 1024 * 1024 * 1024;
71+
// These lowercase fragments map brittle transport errors to friendlier messaging.
72+
// Keep the list up to date as dependencies evolve or add new patterns when gaps appear.
73+
const TRANSPORT_ERROR_SNIPPETS: &[&str] = &[
74+
"h2 protocol error",
75+
"broken pipe",
76+
"connection reset",
77+
"error reading a body from connection",
78+
"connection refused",
79+
"network is unreachable",
80+
"no route to host",
81+
];
82+
83+
#[derive(Debug)]
84+
enum FlightDecodeIssue {
85+
TransportInterrupted,
86+
ServerStatus(String),
87+
SchemaMismatch,
88+
MalformedData,
89+
Other,
90+
}
6791

6892
#[derive(Debug, Clone)]
6993
pub struct UDFFlightClient {
@@ -336,8 +360,7 @@ impl UDFFlightClient {
336360

337361
if result_fields[0].data_type() != return_type {
338362
return Err(ErrorCode::UDFSchemaMismatch(format!(
339-
"UDF server return incorrect type, expected: {}, but got: {}",
340-
return_type,
363+
"The user-defined function \"{func_name}\" returned an unexpected schema. Expected result type {return_type}, but got {}.",
341364
result_fields[0].data_type()
342365
)));
343366
}
@@ -380,11 +403,7 @@ impl UDFFlightClient {
380403
let record_batch_stream = FlightRecordBatchStream::new_from_flight_data(
381404
flight_data_stream.map_err(|err| err.into()),
382405
)
383-
.map_err(|err| {
384-
ErrorCode::UDFDataError(format!(
385-
"Decode record batch failed on UDF function {func_name}: {err}"
386-
))
387-
});
406+
.map_err(|err| handle_flight_decode_error(func_name, err));
388407

389408
let batches: Vec<RecordBatch> = record_batch_stream.try_collect().await?;
390409
if batches.is_empty() {
@@ -399,6 +418,88 @@ impl UDFFlightClient {
399418
}
400419
}
401420

421+
fn handle_flight_decode_error(func_name: &str, err: FlightError) -> ErrorCode {
422+
let issue = classify_flight_error(&err);
423+
let err_text = err.to_string();
424+
425+
match issue {
426+
FlightDecodeIssue::TransportInterrupted => ErrorCode::UDFDataError(format!(
427+
"The user-defined function \"{func_name}\" stopped responding before it finished. Retry the query; if it keeps failing, ensure the UDF server is running or review its logs. (details: {err_text})"
428+
)),
429+
FlightDecodeIssue::ServerStatus(status) => ErrorCode::UDFDataError(format!(
430+
"The user-defined function \"{func_name}\" reported an error: {status}. Review the UDF server logs."
431+
)),
432+
FlightDecodeIssue::SchemaMismatch => ErrorCode::UDFDataError(format!(
433+
"The user-defined function \"{func_name}\" returned an unexpected schema. Ensure the UDF definition matches the server output. (details: {err_text})"
434+
)),
435+
FlightDecodeIssue::MalformedData => ErrorCode::UDFDataError(format!(
436+
"The user-defined function \"{func_name}\" returned data that Databend could not parse. Check the UDF implementation or its logs. (details: {err_text})"
437+
)),
438+
FlightDecodeIssue::Other => ErrorCode::UDFDataError(format!(
439+
"Decode record batch failed on UDF function \"{func_name}\": {err_text}"
440+
)),
441+
}
442+
}
443+
444+
fn classify_flight_error(err: &FlightError) -> FlightDecodeIssue {
445+
match err {
446+
FlightError::Arrow(arrow_err) => classify_arrow_error(arrow_err),
447+
FlightError::Tonic(status) => classify_status(status),
448+
FlightError::ExternalError(source) => classify_external_error(source.as_ref()),
449+
FlightError::ProtocolError(_) | FlightError::DecodeError(_) => {
450+
FlightDecodeIssue::MalformedData
451+
}
452+
FlightError::NotYetImplemented(_) => FlightDecodeIssue::Other,
453+
}
454+
}
455+
456+
fn classify_arrow_error(err: &ArrowError) -> FlightDecodeIssue {
457+
match err {
458+
ArrowError::SchemaError(_) => FlightDecodeIssue::SchemaMismatch,
459+
ArrowError::ExternalError(source) => classify_external_error(source.as_ref()),
460+
ArrowError::IoError(message, _) => classify_error_message(message),
461+
ArrowError::ParseError(_)
462+
| ArrowError::InvalidArgumentError(_)
463+
| ArrowError::ComputeError(_)
464+
| ArrowError::JsonError(_)
465+
| ArrowError::CsvError(_)
466+
| ArrowError::IpcError(_)
467+
| ArrowError::CDataInterface(_)
468+
| ArrowError::ParquetError(_) => FlightDecodeIssue::MalformedData,
469+
_ => FlightDecodeIssue::Other,
470+
}
471+
}
472+
473+
fn classify_status(status: &Status) -> FlightDecodeIssue {
474+
classify_error_message(status.message())
475+
}
476+
477+
fn classify_external_error(error: &(dyn StdError + Send + Sync + 'static)) -> FlightDecodeIssue {
478+
if let Some(arrow_err) = error.downcast_ref::<ArrowError>() {
479+
classify_arrow_error(arrow_err)
480+
} else if let Some(status) = error.downcast_ref::<Status>() {
481+
classify_status(status)
482+
} else if let Some(io_error) = error.downcast_ref::<std::io::Error>() {
483+
classify_error_message(&io_error.to_string())
484+
} else {
485+
classify_error_message(&error.to_string())
486+
}
487+
}
488+
489+
fn classify_error_message(message: &str) -> FlightDecodeIssue {
490+
if is_transport_error_message(message) {
491+
FlightDecodeIssue::TransportInterrupted
492+
} else {
493+
FlightDecodeIssue::ServerStatus(message.to_string())
494+
}
495+
}
496+
497+
pub fn is_transport_error_message(message: &str) -> bool {
498+
let lower = message.to_ascii_lowercase();
499+
TRANSPORT_ERROR_SNIPPETS
500+
.iter()
501+
.any(|snippet| lower.contains(snippet))
502+
}
402503
pub fn error_kind(message: &str) -> &str {
403504
let message = message.to_ascii_lowercase();
404505
if message.contains("timeout") || message.contains("timedout") {
@@ -418,3 +519,70 @@ pub fn error_kind(message: &str) -> &str {
418519
"Other"
419520
}
420521
}
522+
523+
#[cfg(test)]
524+
mod tests {
525+
use tonic::Code;
526+
527+
use super::*;
528+
529+
#[test]
530+
fn transport_error_returns_interrupt_hint() {
531+
let err = handle_flight_decode_error(
532+
"test_udf",
533+
FlightError::Tonic(Box::new(Status::new(
534+
Code::Internal,
535+
"h2 protocol error: error reading a body from connection",
536+
))),
537+
);
538+
let message = err.message();
539+
assert!(
540+
message.contains("stopped responding before it finished"),
541+
"unexpected transport hint: {message}"
542+
);
543+
}
544+
545+
#[test]
546+
fn server_status_is_preserved() {
547+
let err = handle_flight_decode_error(
548+
"test_udf",
549+
FlightError::Tonic(Box::new(Status::new(
550+
Code::Internal,
551+
"remote handler returned validation error",
552+
))),
553+
);
554+
let message = err.message();
555+
assert!(
556+
message.contains("reported an error: remote handler returned validation error"),
557+
"unexpected server status message: {message}"
558+
);
559+
}
560+
561+
#[test]
562+
fn schema_mismatch_detected() {
563+
let err = handle_flight_decode_error(
564+
"test_udf",
565+
FlightError::Arrow(ArrowError::SchemaError(
566+
"expected Int32, got Utf8".to_string(),
567+
)),
568+
);
569+
let message = err.message();
570+
assert!(
571+
message.contains("returned an unexpected schema"),
572+
"schema mismatch hint missing: {message}"
573+
);
574+
}
575+
576+
#[test]
577+
fn malformed_data_reported() {
578+
let err = handle_flight_decode_error(
579+
"test_udf",
580+
FlightError::Arrow(ArrowError::ParseError("bad payload".to_string())),
581+
);
582+
let message = err.message();
583+
assert!(
584+
message.contains("could not parse"),
585+
"malformed data hint missing: {message}"
586+
);
587+
}
588+
}

src/query/service/src/pipelines/processors/transforms/transform_udf_server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use databend_common_catalog::table_context::TableContext;
2424
use databend_common_exception::ErrorCode;
2525
use databend_common_exception::Result;
2626
use databend_common_expression::udf_client::error_kind;
27+
use databend_common_expression::udf_client::is_transport_error_message;
2728
use databend_common_expression::udf_client::UDFFlightClient;
2829
use databend_common_expression::BlockEntry;
2930
use databend_common_expression::ColumnBuilder;
@@ -156,7 +157,7 @@ fn retry_on(err: &databend_common_exception::ErrorCode) -> bool {
156157
if err.code() == ErrorCode::U_D_F_DATA_ERROR {
157158
let message = err.message();
158159
// this means the server can't handle the request in 60s
159-
if message.contains("h2 protocol error") {
160+
if is_transport_error_message(&message) {
160161
return false;
161162
}
162163
}

src/query/service/tests/it/pipelines/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@
1515
mod executor;
1616
mod filter;
1717
mod transforms;
18+
mod udf_transport;

0 commit comments

Comments
 (0)