Skip to content

Conversation

@gustavodemorais
Copy link
Contributor

What is the purpose of the change

The planner currently considers the union of both the unique and upsert keys from the left and from the right to be a valid resulting upsert key. That's true for inner joins but for left/right/full joins that leads to a resulting unique key that contains columns that can be null, which is not valid.

Brief change log

  • Check for null generating columns when creating superset of unique keys

Verifying this change

  • Adjusted existing tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 8, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

assertEquals(
toBitSet(Array(1), Array(1, 5), Array(1, 5, 6)),
mq.getUpsertKeys(logicalLeftJoinOnUniqueKeys).toSet)
assertEquals(toBitSet(Array(1)), mq.getUpsertKeys(logicalLeftJoinOnUniqueKeys).toSet)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am i missing something, I do not see tests for nulls on each side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above is the test: 5, and 6 were columns coming from the right side of the join. So they can't be included in unique keys adn that's why "Array(1, 5), Array(1, 5, 6)" were remove as possible unique keys.

The same for all the other tests that were updated, they test the new behaviour

@gustavodemorais
Copy link
Contributor Author

CI is failling on unrelated e2e tests issue happening across PRs

@xuyangzhong
Copy link
Contributor

Hi, @gustavodemorais. I'm wondering why we can't have null columns as part of unique keys? From what I see in the SQL standard, it seems that unique keys can contain null columns.
Here is the specific description from the SQL 2016 standard:

A unique constraint that does not include a <without overlap specification> on a table T is satisfied if and only
if there do not exist two rows R1 and R2 of T such that R1 and R2 have the same non-null values in the unique
columns. If a unique constraint UC on a table T includes a <without overlap specification>WOS, then let ATPN
be the <application time period name> contained in WOS. UC is satisfied if and only if there do not exist two
rows R1 and R2 of T such that R1 and R2 have the same non-null values in the unique columns and the ATPN
period values of R1 and R2 overlap. In addition, if the unique constraint was defined with PRIMARY KEY,
then it requires that none of the values in the specified column or columns be a null value.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 9, 2025
@gustavodemorais
Copy link
Contributor Author

gustavodemorais commented Oct 20, 2025

Hi, @gustavodemorais. I'm wondering why we can't have null columns as part of unique keys? From what I see in the SQL standard, it seems that unique keys can contain null columns. Here is the specific description from the SQL 2016 standard:

A unique constraint that does not include a <without overlap specification> on a table T is satisfied if and only
if there do not exist two rows R1 and R2 of T such that R1 and R2 have the same non-null values in the unique
columns. If a unique constraint UC on a table T includes a <without overlap specification>WOS, then let ATPN
be the <application time period name> contained in WOS. UC is satisfied if and only if there do not exist two
rows R1 and R2 of T such that R1 and R2 have the same non-null values in the unique columns and the ATPN
period values of R1 and R2 overlap. In addition, if the unique constraint was defined with PRIMARY KEY,
then it requires that none of the values in the specified column or columns be a null value.

Hey @xuyangzhong, thanks for the comment. I was at Flink Forward and just reading your reply now.

That's a good catch. The issue is that we are using the unique keys as upsert keys and in my understand we can't allow upsert keys to contain null values and that's what I was trying to fix here. It leads to a runtime error since a sink expects the upsert key not to contain a null value which can happen in a case of a left join. https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala#L352

We could also have additional logic in getJoinUpsertKeys to remove such keys. Do you have other suggestions?

@xuyangzhong
Copy link
Contributor

@gustavodemorais IIUC, currently there is no explicit handling of null values for the upsert key from the planning stage (where it originates from the unique key, which is not restricted from containing null values) to runtime (where the operators do not include any special treatment for columns that may contain null values in the upsert key). In other words, there are no restrictions preventing the upsert key from containing null values. I’m curious about the issue you mentioned. Could you explain it in more detail?

It leads to a runtime error since a sink expects the upsert key not to contain a null value which can happen in a case of a left join

@gustavodemorais
Copy link
Contributor Author

I’m curious about the issue you mentioned. Could you explain it in more detail?

Sure. If we create two tables and apply a left join, it might happen that the upsert key contains nullable columns, like in the examples in the tests. We can then create a third table with a primary key and submit an insert into with this upsert key. The planner will accept the statement and as soon as we produce one row with a null value, the job fails.

Here are simple repro steps

CREATE TABLE `users` (
  `user_id` INT NOT NULL,
  `other_data` STRING,
  CONSTRAINT `PRIMARY` PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'key.format' = 'json',
  'properties.bootstrap.servers' = 'localhost:9092',
  'topic' = 'users',
  'value.format' = 'json'
);

CREATE TABLE `orders_with_composite_key` (
  `order_id` BIGINT NOT NULL,
  `user_id` INT NOT NULL,
  `item_name` STRING,
  CONSTRAINT `PRIMARY` PRIMARY KEY (`order_id`, `user_id`) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'key.format' = 'json',
  'properties.bootstrap.servers' = 'localhost:9092',
  'topic' = 'orders',
  'value.format' = 'json'
);

CREATE TABLE test_join_upsert (
  user_id INT NOT NULL,
  order_id BIGINT NOT NULL,
  user_id0 INT NOT NULL,
  other_data STRING,
  PRIMARY KEY (user_id, order_id, user_id0) NOT ENFORCED -- this matches the nullable upsert key calculated by the planner
)
WITH (
  'connector' = 'upsert-kafka',
  'key.format' = 'json',
  'properties.bootstrap.servers' = 'localhost:9092',
  'topic' = 'users_orders_upsert',
  'value.format' = 'json'
)

INSERT INTO test_join_upsert
SELECT
  o.user_id,
  o.order_id,
  u.user_id AS user_id0,
  u.other_data
FROM `users` AS u
  LEFT JOIN `orders_with_composite_key` AS o
  ON o.user_id = u.user_id;

-- this causes the job to fail
INSERT INTO users (VALUES (1, '1'));

Two things happen

  • Running explain on the insert above doesn't show upsertMaterialize=[true] because the primary key matches the upsert key calculated by the planner (which I don't think is correct)
  • It fails at runtime after inserting an user with
Caused by: org.apache.flink.table.runtime.operators.sink.constraint.EnforcerException: Column 'user_id' is NOT NULL, however, a null value is being written into it. You can set job configuration 'table.exec.sink.not-null-enforcer'='DROP' to suppress this exception and drop such records silently.
	at org.apache.flink.table.runtime.operators.sink.constraint.NotNullConstraint.enforce(NotNullConstraint.java:55)
	at org.apache.flink.table.runtime.operators.sink.constraint.ConstraintEnforcerExecutor.enforce(ConstraintEnforcerExecutor.java:445)
	at org.apache.flink.table.runtime.operators.sink.constraint.ConstraintEnforcer.processElement(ConstraintEnforcer.java:66)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at StreamExecCalc$6.processElement(Unknown Source)

Ideally we would fail this during planning. Now, I still think that by definition upsert keys cannot contain null values - do you agree with that? At the same time, while reproducing this I realized the runtime issue might not be solved by addressing nullable upsert keys. What do you think?

@xuyangzhong
Copy link
Contributor

xuyangzhong commented Oct 22, 2025

@gustavodemorais I agree with your point that we should ideally find this unexpected behavior during the planning phase. In fact, I believe there's a fundamental issue with how this sql is written. When using a LEFT JOIN, one must be aware that fields from the right table may be null. If the sink table defines some fields from the right table as part of the combined primary key—which must not contain null values—then it's reasonable to encounter an error stating "null attempted to be written to not null."

Moreover, this issue is not related to sink upsert materialization, which is intended to address out-of-order problems. Under this sql, out-of-order issues aren't present, so upsertMaterialize=[true] won't be shown. In other words, even if upsert materialization were utilized, it wouldn't resolve the potential for null values being written to the primary key.

I ran the SQL on MySQL, and I received a similar error regarding null being written to the primary key.

CREATE TABLE T1 (
  a1 int primary key,
  b1 int
);

CREATE TABLE T2 (
  a2 int primary key,
  b2 int
);

CREATE TABLE T3 (
  a1 int,
  b1 int,
  a2 int,
  b2 int,
  primary key (a1, a2)
);

insert into T1 values(1, 1);
insert into T1 values(2, 2);
insert into T2 values(1, 1);

insert into T3 select * from T1 left join T2 on a1 = a2;
ERROR 1048 (23000) at line 23: Column 'a2' cannot be null

@gustavodemorais
Copy link
Contributor Author

Hey @xuyangzhong, thanks for the reply. I think we're aligned that it'd be good to have a validation error during the planning phase for this. We can have a different ticket for that. I agree this is not related to the upsertKeys 👍

Regarding upsertKeys, since it's an internal concept that doesn't come from a standard we follow, it would be good to have a place where we have our definition. Do we have something like this somewhere? Or else we could use this PR to add a detailed description to use as a reference in the future.

@xuyangzhong
Copy link
Contributor

xuyangzhong commented Oct 29, 2025

@gustavodemorais Sorry for the late reply. I found the description of the upsert key here

* Determines the set of upsert minimal keys for this expression. A key is represented as an

@gustavodemorais
Copy link
Contributor Author

gustavodemorais commented Oct 29, 2025

Thanks for the link and the discussion @xuyangzhong. One thing we could make a bit clearer in the description is if

  1. upsertKeys can contain nullable attributes
  2. If they can consist of only nullable attributes

I think your point is that 1 is true. Now regarding 2, if we look purely at the current implementation for joins, and if I understand it correctly, they cannot consist only of nullable values. This is a bit confusing since they can contain nullable attributes (1). Do you know happen to know why? We have these checks for outer joins, see https://github.com/apache/flink/pull/27090/files#diff-8be3389bbe106e1e8b09a8d33b8d0b803e320384dec6fb486de2867bd923e46cR622

@xuyangzhong
Copy link
Contributor

@gustavodemorais The upsert key is essentially the unique key based on the shuffle key, so when we're discussing whether the upsert key can contain or consist solely of null values, we're actually talking about the unique key.

Let me try to explain the code you pointed out. As indicated in the comment, it's a scenario involving a 1-on-1 join. Let's consider that each record on the right can join to either zero or one record on the left. In this case, if we have an inner join (where joinRelType.generatesNullsOnLeft is true), the unique key from the left side can serve as an independent set of unique keys after the join. However, if it's a right outer join, the unique key from the left side cannot independently form a set of unique keys after join.

For example, consider a pkpk join between the left table with schema (pk_left, a) and the right table (pk_right, b), where the join key is defined as pk_left = pk_right. The output would include two columns: pk_left and pk_right.

If it's a right join, the result may include rows like [null, pk_right_1] and [null, pk_right_2], where the values in the pk_left can be null. If this were in a batch processing scenario and we were inserting data directly into a database that has a unique constraint:

  1. If the unique constraint for the result table is defined on the pk_left column, the database will throw an error because it doesn't permit duplicate values null in a unique column.
  2. However, if the unique constraint is defined on the right_pk column or on both left_pk and right_pk, there will be no error.

Based on the reasoning above, the unique keys output from this right join will consist of two sets: {{right_pk}, {left_pk + right_pk}}. And the latter set, {left_pk + right_pk}, may include null values in the column left_pk.

@gustavodemorais
Copy link
Contributor Author

@gustavodemorais The upsert key is essentially the unique key based on the shuffle key, so when we're discussing whether the upsert key can contain or consist solely of null values, we're actually talking about the unique key.

Hey Xuyang, that makes sense. Thanks for the very detailed and well written reply!

In this case, if we have an inner join (where joinRelType.generatesNullsOnLeft is true)

For an inner join, you probably mean joinRelType.generatesNullsOnLeft is false, right? Thus !joinRelType.generatesNullsOnLeft is true

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants