diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala index 046c131edf57c..41fd2838a2cd2 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala @@ -566,7 +566,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu isLeftUnique: JBoolean, isRightUnique: JBoolean): JSet[ImmutableBitSet] = { - // first add the different combinations of concatenated unique keys + // First add the different combinations of concatenated unique keys // from the left and the right, adjusting the right hand side keys to // reflect the addition of the left hand side // @@ -576,15 +576,25 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu // an alternative way of getting unique key information. val retSet = new JHashSet[ImmutableBitSet] val nFieldsOnLeft = leftFieldsCount - val rightSet = if (rightUniqueKeys != null) { + + // We only want to do this if the right side has unique keys AND + // they cannot be null. If the side of the join can generate null keys, + // then we can't use it in the unique key. + val rightSet = if (rightUniqueKeys != null && !joinRelType.generatesNullsOnRight) { val res = new JHashSet[ImmutableBitSet] + // This only collects the unique keys from the right side, offsetting + // the column positions appropriately. rightUniqueKeys.foreach { colMask => val tmpMask = ImmutableBitSet.builder colMask.foreach(bit => tmpMask.set(bit + nFieldsOnLeft)) res.add(tmpMask.build()) } - if (leftUniqueKeys != null) { + + // If the left side has unique keys AND the keys cannot be null, + // then we can combine them with the right side unique keys to form + // a superset of unique keys. + if (leftUniqueKeys != null && !joinRelType.generatesNullsOnLeft) { res.foreach { // 1) Concatenate unique keys from both sides to get a superset that is unique. // If left is unique on {0,1} and right on {0}, then {0,1} (after offset) remains unique, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala index ddb440bc320cd..403438915fd68 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala @@ -299,25 +299,22 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase { assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalInnerJoinWithoutEquiCond).toSet) assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalInnerJoinWithEquiAndNonEquiCond).toSet) - assertEquals( - uniqueKeys(Array(1), Array(1, 5), Array(1, 5, 6)), - mq.getUniqueKeys(logicalLeftJoinOnUniqueKeys).toSet) + assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(logicalLeftJoinOnUniqueKeys).toSet) + assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalLeftJoinNotOnUniqueKeys).toSet) assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalLeftJoinOnRHSUniqueKeys).toSet) assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalLeftJoinWithoutEquiCond).toSet) assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalLeftJoinWithEquiAndNonEquiCond).toSet) assertEquals( - uniqueKeys(Array(5), Array(1, 5), Array(5, 6), Array(1, 5, 6)), + uniqueKeys(Array(5), Array(5, 6)), mq.getUniqueKeys(logicalRightJoinOnUniqueKeys).toSet) assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalRightJoinNotOnUniqueKeys).toSet) assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalRightJoinOnLHSUniqueKeys).toSet) assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalRightJoinWithoutEquiCond).toSet) assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalRightJoinWithEquiAndNonEquiCond).toSet) - assertEquals( - uniqueKeys(Array(1, 5), Array(1, 5, 6)), - mq.getUniqueKeys(logicalFullJoinOnUniqueKeys).toSet) + assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinOnUniqueKeys).toSet) assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinNotOnUniqueKeys).toSet) assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinOnRHSUniqueKeys).toSet) assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinWithoutEquiCond).toSet) @@ -381,11 +378,11 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase { @Test def testGetUniqueKeysOnTableScanTable(): Unit = { assertEquals( - uniqueKeys(Array(0, 1), Array(0, 1, 5)), + uniqueKeys(Array(0, 1)), mq.getUniqueKeys(logicalLeftJoinOnContainedUniqueKeys).toSet ) assertEquals( - uniqueKeys(Array(0, 1, 5)), + uniqueKeys(), mq.getUniqueKeys(logicalLeftJoinOnDisjointUniqueKeys).toSet ) assertEquals( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala index 40e38dda16120..135b00268b88a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala @@ -327,25 +327,21 @@ class FlinkRelMdUpsertKeysTest extends FlinkRelMdHandlerTestBase { assertEquals(toBitSet(), mq.getUpsertKeys(logicalInnerJoinWithoutEquiCond).toSet) assertEquals(toBitSet(), mq.getUpsertKeys(logicalInnerJoinWithEquiAndNonEquiCond).toSet) - assertEquals( - toBitSet(Array(1), Array(1, 5), Array(1, 5, 6)), - mq.getUpsertKeys(logicalLeftJoinOnUniqueKeys).toSet) + assertEquals(toBitSet(Array(1)), mq.getUpsertKeys(logicalLeftJoinOnUniqueKeys).toSet) assertEquals(toBitSet(), mq.getUpsertKeys(logicalLeftJoinNotOnUniqueKeys).toSet) assertEquals(toBitSet(), mq.getUpsertKeys(logicalLeftJoinOnRHSUniqueKeys).toSet) assertEquals(toBitSet(), mq.getUpsertKeys(logicalLeftJoinWithoutEquiCond).toSet) assertEquals(toBitSet(), mq.getUpsertKeys(logicalLeftJoinWithEquiAndNonEquiCond).toSet) assertEquals( - toBitSet(Array(5), Array(1, 5), Array(5, 6), Array(1, 5, 6)), + toBitSet(Array(5), Array(5, 6)), mq.getUpsertKeys(logicalRightJoinOnUniqueKeys).toSet) assertEquals(toBitSet(), mq.getUpsertKeys(logicalRightJoinNotOnUniqueKeys).toSet) assertEquals(toBitSet(), mq.getUpsertKeys(logicalRightJoinOnLHSUniqueKeys).toSet) assertEquals(toBitSet(), mq.getUpsertKeys(logicalRightJoinWithoutEquiCond).toSet) assertEquals(toBitSet(), mq.getUpsertKeys(logicalRightJoinWithEquiAndNonEquiCond).toSet) - assertEquals( - toBitSet(Array(1, 5), Array(1, 5, 6)), - mq.getUpsertKeys(logicalFullJoinOnUniqueKeys).toSet) + assertEquals(toBitSet(), mq.getUpsertKeys(logicalFullJoinOnUniqueKeys).toSet) assertEquals(toBitSet(), mq.getUpsertKeys(logicalFullJoinNotOnUniqueKeys).toSet) assertEquals(toBitSet(), mq.getUpsertKeys(logicalFullJoinOnRHSUniqueKeys).toSet) assertEquals(toBitSet(), mq.getUpsertKeys(logicalFullJoinWithoutEquiCond).toSet)