Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;

Expand Down Expand Up @@ -112,4 +113,13 @@ public interface RichFunction extends Function {
* @param t The runtime context.
*/
void setRuntimeContext(RuntimeContext t);

/**
* Can be overridden to enable splittable timers for this particular function even if config
* option is enabled. By default, splittable timers are disabled.
*/
@Internal
default boolean useInterruptibleTimers() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@
import java.util.Collections;
import java.util.Locale;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
Expand Down Expand Up @@ -124,6 +127,8 @@ public abstract class AbstractStreamOperator<OUT>

private transient @Nullable MailboxWatermarkProcessor watermarkProcessor;

private final WatermarkConsumerSupplier<OUT> watermarkConsumerSupplier;

// ---------------- key/value state ------------------

/**
Expand Down Expand Up @@ -160,9 +165,17 @@ public abstract class AbstractStreamOperator<OUT>
protected transient RecordAttributes lastRecordAttributes1;
protected transient RecordAttributes lastRecordAttributes2;

public AbstractStreamOperator() {}
public AbstractStreamOperator() {
this(null);
}

public AbstractStreamOperator(StreamOperatorParameters<OUT> parameters) {
this(parameters, WatermarkConsumerSupplier.defaultSupplier());
}

public AbstractStreamOperator(
StreamOperatorParameters<OUT> parameters,
WatermarkConsumerSupplier<OUT> watermarkConsumerSupplier) {
if (parameters != null) {
setup(
parameters.getContainingTask(),
Expand All @@ -171,12 +184,12 @@ public AbstractStreamOperator(StreamOperatorParameters<OUT> parameters) {
this.processingTimeService =
Preconditions.checkNotNull(parameters.getProcessingTimeService());
}
this.watermarkConsumerSupplier = checkNotNull(watermarkConsumerSupplier);
}

// ------------------------------------------------------------------------
// Life Cycle
// ------------------------------------------------------------------------

protected void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Expand Down Expand Up @@ -381,7 +394,9 @@ && areInterruptibleTimersConfigured()
&& getTimeServiceManager().isPresent()) {
this.watermarkProcessor =
new MailboxWatermarkProcessor(
output, mailboxExecutor, getTimeServiceManager().get());
watermarkConsumerSupplier.apply(output),
mailboxExecutor,
getTimeServiceManager().get());
}
}

Expand Down Expand Up @@ -770,4 +785,43 @@ public void processWatermark1(WatermarkEvent watermark) throws Exception {
public void processWatermark2(WatermarkEvent watermark) throws Exception {
output.emitWatermark(watermark);
}

public interface WatermarkConsumerSupplier<OUT>
extends Function<Output<StreamRecord<OUT>>, Consumer<Watermark>>, Serializable {

static <OUT> WatermarkConsumerSupplier<OUT> defaultSupplier() {
return new DirectWatermarkConsumerSupplier<>();
}

static <OUT> WatermarkConsumerSupplier<OUT> delayedSupplier(long delay) {
return new DelayedWatermarkConsumerSupplier<>(delay);
}

class DirectWatermarkConsumerSupplier<OUT> implements WatermarkConsumerSupplier<OUT> {
private static final long serialVersionUID = 1L;

@Override
public Consumer<Watermark> apply(Output<StreamRecord<OUT>> output) {
return output::emitWatermark;
}
}

class DelayedWatermarkConsumerSupplier<OUT> implements WatermarkConsumerSupplier<OUT> {
private static final long serialVersionUID = 1L;

private final long watermarkDelay;

public DelayedWatermarkConsumerSupplier(long watermarkDelay) {
Preconditions.checkArgument(
watermarkDelay > 0, "The watermark delay should be positive.");
this.watermarkDelay = watermarkDelay;
}

@Override
public Consumer<Watermark> apply(Output<StreamRecord<OUT>> out) {
return mark ->
out.emitWatermark(new Watermark(mark.getTimestamp() - watermarkDelay));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,23 @@ public abstract class AbstractStreamOperatorV2<OUT>
protected final LatencyStats latencyStats;
protected final ProcessingTimeService processingTimeService;
protected final RecordAttributes[] lastRecordAttributes;
private final AbstractStreamOperator.WatermarkConsumerSupplier<OUT> watermarkConsumerSupplier;

protected StreamOperatorStateHandler stateHandler;
protected InternalTimeServiceManager<?> timeServiceManager;
private @Nullable MailboxWatermarkProcessor watermarkProcessor;

public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int numberOfInputs) {
this(
parameters,
numberOfInputs,
AbstractStreamOperator.WatermarkConsumerSupplier.defaultSupplier());
}

public AbstractStreamOperatorV2(
StreamOperatorParameters<OUT> parameters,
int numberOfInputs,
AbstractStreamOperator.WatermarkConsumerSupplier<OUT> watermarkConsumerSupplier) {
final Environment environment = parameters.getContainingTask().getEnvironment();
config = parameters.getStreamConfig();
output = parameters.getOutput();
Expand Down Expand Up @@ -148,6 +159,7 @@ public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int nu
environment.getExternalResourceInfoProvider());

mailboxExecutor = parameters.getMailboxExecutor();
this.watermarkConsumerSupplier = watermarkConsumerSupplier;
}

private LatencyStats createLatencyStats(
Expand Down Expand Up @@ -241,7 +253,9 @@ && areInterruptibleTimersConfigured()
&& getTimeServiceManager().isPresent()) {
watermarkProcessor =
new MailboxWatermarkProcessor(
output, mailboxExecutor, getTimeServiceManager().get());
watermarkConsumerSupplier.apply(output),
mailboxExecutor,
getTimeServiceManager().get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -61,7 +63,14 @@ public AbstractUdfStreamOperator(F userFunction) {
}

protected AbstractUdfStreamOperator(StreamOperatorParameters<OUT> parameters, F userFunction) {
super(parameters);
this(parameters, userFunction, WatermarkConsumerSupplier.defaultSupplier());
}

protected AbstractUdfStreamOperator(
StreamOperatorParameters<OUT> parameters,
F userFunction,
WatermarkConsumerSupplier<OUT> watermarkConsumerSupplier) {
super(parameters, watermarkConsumerSupplier);
this.userFunction = requireNonNull(userFunction);
checkUdfCheckpointingPreconditions();
}
Expand Down Expand Up @@ -176,4 +185,10 @@ private void checkUdfCheckpointingPreconditions() {
+ "CheckpointedFunction AND ListCheckpointed.");
}
}

@Internal
public boolean useInterruptibleTimers() {
return userFunction instanceof RichFunction
&& ((RichFunction) userFunction).useInterruptibleTimers();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Consumer;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -41,7 +43,7 @@
public class MailboxWatermarkProcessor<OUT> {
protected static final Logger LOG = LoggerFactory.getLogger(MailboxWatermarkProcessor.class);

private final Output<StreamRecord<OUT>> output;
private final Consumer<Watermark> output;
private final MailboxExecutor mailboxExecutor;
private final InternalTimeServiceManager<?> internalTimeServiceManager;

Expand All @@ -57,6 +59,13 @@ public MailboxWatermarkProcessor(
Output<StreamRecord<OUT>> output,
MailboxExecutor mailboxExecutor,
InternalTimeServiceManager<?> internalTimeServiceManager) {
this(output::emitWatermark, mailboxExecutor, internalTimeServiceManager);
}

public MailboxWatermarkProcessor(
Consumer<Watermark> output,
MailboxExecutor mailboxExecutor,
InternalTimeServiceManager<?> internalTimeServiceManager) {
this.output = checkNotNull(output);
this.mailboxExecutor = checkNotNull(mailboxExecutor);
this.internalTimeServiceManager = checkNotNull(internalTimeServiceManager);
Expand All @@ -73,7 +82,7 @@ private void emitWatermarkInsideMailbox() throws Exception {
if (internalTimeServiceManager.tryAdvanceWatermark(
maxInputWatermark, mailboxExecutor::shouldInterrupt)) {
// In case output watermark has fully progressed emit it downstream.
output.emitWatermark(maxInputWatermark);
output.accept(maxInputWatermark);
} else if (!progressWatermarkScheduled) {
progressWatermarkScheduled = true;
// We still have work to do, but we need to let other mails to be processed first.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
private transient OnTimerContextImpl<K, IN1, IN2, OUT> onTimerContext;

public KeyedCoProcessOperator(KeyedCoProcessFunction<K, IN1, IN2, OUT> keyedCoProcessFunction) {
super(keyedCoProcessFunction);
this(keyedCoProcessFunction, WatermarkConsumerSupplier.defaultSupplier());
}

public KeyedCoProcessOperator(
KeyedCoProcessFunction<K, IN1, IN2, OUT> keyedCoProcessFunction,
WatermarkConsumerSupplier<OUT> watermarkConsumerSupplier) {
super(null, keyedCoProcessFunction, watermarkConsumerSupplier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,24 @@
package org.apache.flink.table.runtime.operators.join;

import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.util.Optional;
import java.util.function.Consumer;

/** A {@link KeyedCoProcessOperator} that supports holding back watermarks with a static delay. */
public class KeyedCoProcessOperatorWithWatermarkDelay<K, IN1, IN2, OUT>
extends KeyedCoProcessOperator<K, IN1, IN2, OUT> {

private static final long serialVersionUID = -7435774708099223442L;

private final Consumer<Watermark> emitter;

public KeyedCoProcessOperatorWithWatermarkDelay(
KeyedCoProcessFunction<K, IN1, IN2, OUT> flatMapper, long watermarkDelay) {
super(flatMapper);
Preconditions.checkArgument(
watermarkDelay >= 0, "The watermark delay should be non-negative.");
if (watermarkDelay == 0) {
// emits watermark without delay
emitter =
(Consumer<Watermark> & Serializable)
(Watermark mark) -> output.emitWatermark(mark);
} else {
// emits watermark with delay
emitter =
(Consumer<Watermark> & Serializable)
(Watermark mark) ->
output.emitWatermark(
new Watermark(mark.getTimestamp() - watermarkDelay));
}
super(flatMapper, getWatermarkConsumer(watermarkDelay));
}

@Override
public void processWatermark(Watermark mark) throws Exception {
Optional<InternalTimeServiceManager<?>> timeServiceManager = getTimeServiceManager();
if (timeServiceManager.isPresent()) {
timeServiceManager.get().advanceWatermark(mark);
private static <OUT> WatermarkConsumerSupplier<OUT> getWatermarkConsumer(long watermarkDelay) {
if (watermarkDelay == 0) {
return WatermarkConsumerSupplier.defaultSupplier();
} else {
return WatermarkConsumerSupplier.delayedSupplier(watermarkDelay);
}
emitter.accept(mark);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -484,4 +484,8 @@ private void removeExpiredRows(
* @param cleanupTime timestamp for the timer
*/
abstract void registerTimer(Context ctx, long cleanupTime);

public boolean useInterruptibleTimers() {
return true;
}
}