diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandInference.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandInference.java index fba8d45034fec..51835f1004cb9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandInference.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandInference.java @@ -81,8 +81,10 @@ public void inferOperandTypes( false)) { inferOperandTypesOrError(unwrapTypeFactory(callBinding), callContext, operandTypes); } - } catch (ValidationException | CalciteContextException e) { + } catch (ValidationException e) { // let operand checker fail + } catch (CalciteContextException e) { + throw e; } catch (Throwable t) { throw createUnexpectedException(callContext, t); } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala index 45ef3e380ee33..d6c47d48459f3 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala @@ -108,6 +108,21 @@ class CalcTest extends TableTestBase { .isThrownBy(() => util.tableEnv.sqlQuery("SELECT a, foo FROM MyTable")) } + @Test + def testCoalesceOnInvalidField(): Unit = { + assertThatExceptionOfType(classOf[ValidationException]) + .isThrownBy(() => util.verifyExecPlan("SELECT coalesce(SELECT invalid)")) + .withMessageContaining("Column 'invalid' not found in any table") + } + + @Test + def testNestedCoalesceOnInvalidField(): Unit = { + assertThatExceptionOfType(classOf[ValidationException]) + .isThrownBy( + () => util.verifyExecPlan("SELECT coalesce(SELECT coalesce(SELECT coalesce(invalid)))")) + .withMessageContaining("Column 'invalid' not found in any table") + } + @Test def testPrimitiveMapType(): Unit = { util.verifyExecPlan("SELECT MAP[b, 30, 10, a] FROM MyTable")