From cf255ce2244d546c6a5f6a8aa3ef085e1943346d Mon Sep 17 00:00:00 2001 From: rajuGT Date: Tue, 23 Sep 2025 23:49:02 +0530 Subject: [PATCH] small refactor --- .../java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java index 12c966be7..7114cf829 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java @@ -105,11 +105,11 @@ public JobBuilder registerSourceWithPreProcessors() { DataStream dataStream = stream.registerSource(executionEnvironment, watermarkStrategyDefinition.getWatermarkStrategy(watermarkDelay)); StreamWatermarkAssigner streamWatermarkAssigner = new StreamWatermarkAssigner(new LastColumnWatermark()); - DataStream rowSingleOutputStreamOperator = streamWatermarkAssigner + DataStream dataStreamWithWaterMark = streamWatermarkAssigner .assignTimeStampAndWatermark(dataStream, watermarkDelay, enablePerPartitionWatermark); TableSchema tableSchema = TableSchema.fromTypeInfo(dataStream.getType()); - StreamInfo streamInfo = new StreamInfo(rowSingleOutputStreamOperator, tableSchema.getFieldNames()); + StreamInfo streamInfo = new StreamInfo(dataStreamWithWaterMark, tableSchema.getFieldNames()); streamInfo = addPreProcessor(streamInfo, tableName); Table table = tableEnvironment.fromDataStream(streamInfo.getDataStream(), getApiExpressions(streamInfo));