From e6699409637a8f047e77cd5f38e0b471913d6055 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Mon, 15 Dec 2025 17:59:02 +0100 Subject: [PATCH] [FLINK-38810] Use splittable timers in Interval Join --- .../api/common/functions/RichFunction.java | 10 ++++ .../api/operators/AbstractStreamOperator.java | 60 ++++++++++++++++++- .../operators/AbstractStreamOperatorV2.java | 16 ++++- .../operators/AbstractUdfStreamOperator.java | 17 +++++- .../operators/MailboxWatermarkProcessor.java | 13 +++- .../operators/co/KeyedCoProcessOperator.java | 8 ++- ...edCoProcessOperatorWithWatermarkDelay.java | 37 ++---------- .../join/interval/TimeIntervalJoin.java | 4 ++ 8 files changed, 126 insertions(+), 39 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java index f1aa70a97d953..c1373f701f178 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.functions; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; @@ -112,4 +113,13 @@ public interface RichFunction extends Function { * @param t The runtime context. */ void setRuntimeContext(RuntimeContext t); + + /** + * Can be overridden to enable splittable timers for this particular function even if config + * option is enabled. By default, splittable timers are disabled. + */ + @Internal + default boolean useInterruptibleTimers() { + return false; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a4f2a7563842b..9e82713b23e75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -73,7 +73,10 @@ import java.util.Collections; import java.util.Locale; import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; +import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** @@ -124,6 +127,8 @@ public abstract class AbstractStreamOperator private transient @Nullable MailboxWatermarkProcessor watermarkProcessor; + private final WatermarkConsumerSupplier watermarkConsumerSupplier; + // ---------------- key/value state ------------------ /** @@ -160,9 +165,17 @@ public abstract class AbstractStreamOperator protected transient RecordAttributes lastRecordAttributes1; protected transient RecordAttributes lastRecordAttributes2; - public AbstractStreamOperator() {} + public AbstractStreamOperator() { + this(null); + } public AbstractStreamOperator(StreamOperatorParameters parameters) { + this(parameters, WatermarkConsumerSupplier.defaultSupplier()); + } + + public AbstractStreamOperator( + StreamOperatorParameters parameters, + WatermarkConsumerSupplier watermarkConsumerSupplier) { if (parameters != null) { setup( parameters.getContainingTask(), @@ -171,12 +184,12 @@ public AbstractStreamOperator(StreamOperatorParameters parameters) { this.processingTimeService = Preconditions.checkNotNull(parameters.getProcessingTimeService()); } + this.watermarkConsumerSupplier = checkNotNull(watermarkConsumerSupplier); } // ------------------------------------------------------------------------ // Life Cycle // ------------------------------------------------------------------------ - protected void setup( StreamTask containingTask, StreamConfig config, @@ -381,7 +394,9 @@ && areInterruptibleTimersConfigured() && getTimeServiceManager().isPresent()) { this.watermarkProcessor = new MailboxWatermarkProcessor( - output, mailboxExecutor, getTimeServiceManager().get()); + watermarkConsumerSupplier.apply(output), + mailboxExecutor, + getTimeServiceManager().get()); } } @@ -770,4 +785,43 @@ public void processWatermark1(WatermarkEvent watermark) throws Exception { public void processWatermark2(WatermarkEvent watermark) throws Exception { output.emitWatermark(watermark); } + + public interface WatermarkConsumerSupplier + extends Function>, Consumer>, Serializable { + + static WatermarkConsumerSupplier defaultSupplier() { + return new DirectWatermarkConsumerSupplier<>(); + } + + static WatermarkConsumerSupplier delayedSupplier(long delay) { + return new DelayedWatermarkConsumerSupplier<>(delay); + } + + class DirectWatermarkConsumerSupplier implements WatermarkConsumerSupplier { + private static final long serialVersionUID = 1L; + + @Override + public Consumer apply(Output> output) { + return output::emitWatermark; + } + } + + class DelayedWatermarkConsumerSupplier implements WatermarkConsumerSupplier { + private static final long serialVersionUID = 1L; + + private final long watermarkDelay; + + public DelayedWatermarkConsumerSupplier(long watermarkDelay) { + Preconditions.checkArgument( + watermarkDelay > 0, "The watermark delay should be positive."); + this.watermarkDelay = watermarkDelay; + } + + @Override + public Consumer apply(Output> out) { + return mark -> + out.emitWatermark(new Watermark(mark.getTimestamp() - watermarkDelay)); + } + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index d89eaf4654458..ad06a6e256bfd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -107,12 +107,23 @@ public abstract class AbstractStreamOperatorV2 protected final LatencyStats latencyStats; protected final ProcessingTimeService processingTimeService; protected final RecordAttributes[] lastRecordAttributes; + private final AbstractStreamOperator.WatermarkConsumerSupplier watermarkConsumerSupplier; protected StreamOperatorStateHandler stateHandler; protected InternalTimeServiceManager timeServiceManager; private @Nullable MailboxWatermarkProcessor watermarkProcessor; public AbstractStreamOperatorV2(StreamOperatorParameters parameters, int numberOfInputs) { + this( + parameters, + numberOfInputs, + AbstractStreamOperator.WatermarkConsumerSupplier.defaultSupplier()); + } + + public AbstractStreamOperatorV2( + StreamOperatorParameters parameters, + int numberOfInputs, + AbstractStreamOperator.WatermarkConsumerSupplier watermarkConsumerSupplier) { final Environment environment = parameters.getContainingTask().getEnvironment(); config = parameters.getStreamConfig(); output = parameters.getOutput(); @@ -148,6 +159,7 @@ public AbstractStreamOperatorV2(StreamOperatorParameters parameters, int nu environment.getExternalResourceInfoProvider()); mailboxExecutor = parameters.getMailboxExecutor(); + this.watermarkConsumerSupplier = watermarkConsumerSupplier; } private LatencyStats createLatencyStats( @@ -241,7 +253,9 @@ && areInterruptibleTimersConfigured() && getTimeServiceManager().isPresent()) { watermarkProcessor = new MailboxWatermarkProcessor( - output, mailboxExecutor, getTimeServiceManager().get()); + watermarkConsumerSupplier.apply(output), + mailboxExecutor, + getTimeServiceManager().get()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 656bc2ed9414f..38bc21b2db8cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -18,10 +18,12 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.DefaultOpenContext; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -61,7 +63,14 @@ public AbstractUdfStreamOperator(F userFunction) { } protected AbstractUdfStreamOperator(StreamOperatorParameters parameters, F userFunction) { - super(parameters); + this(parameters, userFunction, WatermarkConsumerSupplier.defaultSupplier()); + } + + protected AbstractUdfStreamOperator( + StreamOperatorParameters parameters, + F userFunction, + WatermarkConsumerSupplier watermarkConsumerSupplier) { + super(parameters, watermarkConsumerSupplier); this.userFunction = requireNonNull(userFunction); checkUdfCheckpointingPreconditions(); } @@ -176,4 +185,10 @@ private void checkUdfCheckpointingPreconditions() { + "CheckpointedFunction AND ListCheckpointed."); } } + + @Internal + public boolean useInterruptibleTimers() { + return userFunction instanceof RichFunction + && ((RichFunction) userFunction).useInterruptibleTimers(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java index fb498f65f07ee..481ea5543982a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java @@ -26,6 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.function.Consumer; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -41,7 +43,7 @@ public class MailboxWatermarkProcessor { protected static final Logger LOG = LoggerFactory.getLogger(MailboxWatermarkProcessor.class); - private final Output> output; + private final Consumer output; private final MailboxExecutor mailboxExecutor; private final InternalTimeServiceManager internalTimeServiceManager; @@ -57,6 +59,13 @@ public MailboxWatermarkProcessor( Output> output, MailboxExecutor mailboxExecutor, InternalTimeServiceManager internalTimeServiceManager) { + this(output::emitWatermark, mailboxExecutor, internalTimeServiceManager); + } + + public MailboxWatermarkProcessor( + Consumer output, + MailboxExecutor mailboxExecutor, + InternalTimeServiceManager internalTimeServiceManager) { this.output = checkNotNull(output); this.mailboxExecutor = checkNotNull(mailboxExecutor); this.internalTimeServiceManager = checkNotNull(internalTimeServiceManager); @@ -73,7 +82,7 @@ private void emitWatermarkInsideMailbox() throws Exception { if (internalTimeServiceManager.tryAdvanceWatermark( maxInputWatermark, mailboxExecutor::shouldInterrupt)) { // In case output watermark has fully progressed emit it downstream. - output.emitWatermark(maxInputWatermark); + output.accept(maxInputWatermark); } else if (!progressWatermarkScheduled) { progressWatermarkScheduled = true; // We still have work to do, but we need to let other mails to be processed first. diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java index 39eadba89e8a5..9126c4bebdee6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java @@ -54,7 +54,13 @@ public class KeyedCoProcessOperator private transient OnTimerContextImpl onTimerContext; public KeyedCoProcessOperator(KeyedCoProcessFunction keyedCoProcessFunction) { - super(keyedCoProcessFunction); + this(keyedCoProcessFunction, WatermarkConsumerSupplier.defaultSupplier()); + } + + public KeyedCoProcessOperator( + KeyedCoProcessFunction keyedCoProcessFunction, + WatermarkConsumerSupplier watermarkConsumerSupplier) { + super(null, keyedCoProcessFunction, watermarkConsumerSupplier); } @Override diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/KeyedCoProcessOperatorWithWatermarkDelay.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/KeyedCoProcessOperatorWithWatermarkDelay.java index 7552a40e6624b..8473d41acad1f 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/KeyedCoProcessOperatorWithWatermarkDelay.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/KeyedCoProcessOperatorWithWatermarkDelay.java @@ -19,14 +19,7 @@ package org.apache.flink.table.runtime.operators.join; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.util.Preconditions; - -import java.io.Serializable; -import java.util.Optional; -import java.util.function.Consumer; /** A {@link KeyedCoProcessOperator} that supports holding back watermarks with a static delay. */ public class KeyedCoProcessOperatorWithWatermarkDelay @@ -34,34 +27,16 @@ public class KeyedCoProcessOperatorWithWatermarkDelay private static final long serialVersionUID = -7435774708099223442L; - private final Consumer emitter; - public KeyedCoProcessOperatorWithWatermarkDelay( KeyedCoProcessFunction flatMapper, long watermarkDelay) { - super(flatMapper); - Preconditions.checkArgument( - watermarkDelay >= 0, "The watermark delay should be non-negative."); - if (watermarkDelay == 0) { - // emits watermark without delay - emitter = - (Consumer & Serializable) - (Watermark mark) -> output.emitWatermark(mark); - } else { - // emits watermark with delay - emitter = - (Consumer & Serializable) - (Watermark mark) -> - output.emitWatermark( - new Watermark(mark.getTimestamp() - watermarkDelay)); - } + super(flatMapper, getWatermarkConsumer(watermarkDelay)); } - @Override - public void processWatermark(Watermark mark) throws Exception { - Optional> timeServiceManager = getTimeServiceManager(); - if (timeServiceManager.isPresent()) { - timeServiceManager.get().advanceWatermark(mark); + private static WatermarkConsumerSupplier getWatermarkConsumer(long watermarkDelay) { + if (watermarkDelay == 0) { + return WatermarkConsumerSupplier.defaultSupplier(); + } else { + return WatermarkConsumerSupplier.delayedSupplier(watermarkDelay); } - emitter.accept(mark); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java index ba0cee825f572..902a5673543a8 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java @@ -484,4 +484,8 @@ private void removeExpiredRows( * @param cleanupTime timestamp for the timer */ abstract void registerTimer(Context ctx, long cleanupTime); + + public boolean useInterruptibleTimers() { + return true; + } }