From 8b52fa6f5acf07c2bc3f8bfa341624ccabbbae40 Mon Sep 17 00:00:00 2001 From: lavkesh Date: Wed, 11 Oct 2023 21:26:32 +0800 Subject: [PATCH 1/4] feat: add processing time on gcs partition --- .../firehose/config/BlobSinkConfig.java | 4 +++ .../firehose/sink/blob/message/Record.java | 13 ++++++++ .../local/path/TimePartitionedPathUtils.java | 17 ++++------ .../sink/blob/message/RecordTest.java | 33 +++++++++++++++++++ .../blob/writer/WriterOrchestratorTest.java | 9 +++++ 5 files changed, 65 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java b/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java index 0e01a73f0..da9d94bf5 100644 --- a/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java @@ -49,6 +49,10 @@ public interface BlobSinkConfig extends AppConfig { @Key("SINK_BLOB_FILE_PARTITION_PROTO_TIMESTAMP_FIELD_NAME") String getFilePartitionProtoTimestampFieldName(); + @Key("SINK_BLOB_FILE_PARTITION_PROCESSING_TIME_ENABLED") + @DefaultValue("false") + boolean getFilePartitionProcessingTimeEnabled(); + @Key("SINK_BLOB_FILE_PARTITION_TIME_GRANULARITY_TYPE") @DefaultValue("day") @ConverterClass(BlobSinkFilePartitionTypeConverter.class) diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java b/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java index 1bec503de..7f39fb1fd 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java @@ -2,11 +2,14 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; +import com.gotocompany.firehose.config.BlobSinkConfig; import com.gotocompany.firehose.sink.blob.proto.KafkaMetadataProtoMessage; import lombok.AllArgsConstructor; import lombok.Data; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; @AllArgsConstructor @Data @@ -34,4 +37,14 @@ public Instant getTimestamp(String fieldName) { int nanos = (int) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("nanos")); return Instant.ofEpochSecond(seconds, nanos); } + + public LocalDateTime getLocalDateTime(BlobSinkConfig config) { + if (config.getFilePartitionProcessingTimeEnabled()) { + return LocalDateTime.now(); + } else { + return LocalDateTime.ofInstant( + getTimestamp(config.getFilePartitionProtoTimestampFieldName()), + ZoneId.of(config.getFilePartitionProtoTimestampTimezone())); + } + } } diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/path/TimePartitionedPathUtils.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/path/TimePartitionedPathUtils.java index 97e242419..5c4bd6d8e 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/path/TimePartitionedPathUtils.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/path/TimePartitionedPathUtils.java @@ -7,11 +7,7 @@ import java.nio.file.Path; import java.nio.file.Paths; -import java.time.Instant; -import java.time.LocalDate; import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.ZoneId; import java.time.format.DateTimeFormatter; /** @@ -25,14 +21,13 @@ public class TimePartitionedPathUtils { public static Path getTimePartitionedPath(Record record, BlobSinkConfig sinkConfig) { String topic = record.getTopic(sinkConfig.getOutputKafkaMetadataColumnName()); - Instant timestamp = record.getTimestamp(sinkConfig.getFilePartitionProtoTimestampFieldName()); + Path path = Paths.get(topic); if (sinkConfig.getFilePartitionTimeGranularityType() == Constants.FilePartitionType.NONE) { - return Paths.get(topic); + return path; } - LocalDate localDate = LocalDateTime.ofInstant(timestamp, ZoneId.of(sinkConfig.getFilePartitionProtoTimestampTimezone())).toLocalDate(); - String datePart = DATE_FORMATTER.format(localDate); - LocalTime localTime = LocalDateTime.ofInstant(timestamp, ZoneId.of(sinkConfig.getFilePartitionProtoTimestampTimezone())).toLocalTime(); - String hourPart = HOUR_FORMATTER.format(localTime); + LocalDateTime dateTime = record.getLocalDateTime(sinkConfig); + String datePart = DATE_FORMATTER.format(dateTime.toLocalDate()); + String hourPart = HOUR_FORMATTER.format(dateTime.toLocalTime()); String dateSegment = String.format("%s%s", sinkConfig.getFilePartitionTimeDatePrefix(), datePart); String hourSegment = String.format("%s%s", sinkConfig.getFilePartitionTimeHourPrefix(), hourPart); @@ -40,7 +35,7 @@ public static Path getTimePartitionedPath(Record record, BlobSinkConfig sinkConf String dateTimePartition; switch (sinkConfig.getFilePartitionTimeGranularityType()) { case NONE: - return Paths.get(topic); + return path; case DAY: dateTimePartition = String.format("%s", dateSegment); break; diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java index e997729e9..c7c2504eb 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java @@ -1,11 +1,15 @@ package com.gotocompany.firehose.sink.blob.message; import com.google.protobuf.DynamicMessage; +import com.gotocompany.firehose.config.BlobSinkConfig; import com.gotocompany.firehose.sink.blob.TestUtils; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; public class RecordTest { @@ -39,4 +43,33 @@ public void shouldGetTimeStampFromMessage() { Record record = new Record(message, metadata); Assert.assertEquals(defaultTimestamp, record.getTimestamp("created_time")); } + + @Test + public void shouldGetDateTimeLocally() throws InterruptedException { + BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class); + Mockito.when(config.getFilePartitionProcessingTimeEnabled()).thenReturn(true); + DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); + DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic); + Record record = new Record(message, metadata); + LocalDateTime before = LocalDateTime.now(); + Thread.sleep(1000); + LocalDateTime localDateTime = record.getLocalDateTime(config); + Thread.sleep(1000); + LocalDateTime after = LocalDateTime.now(); + Assert.assertTrue(localDateTime.isAfter(before)); + Assert.assertTrue(localDateTime.isBefore(after)); + } + + @Test + public void shouldGetDateTimeFromMessage() throws InterruptedException { + BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class); + Mockito.when(config.getFilePartitionProcessingTimeEnabled()).thenReturn(false); + Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time"); + Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC"); + DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); + DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic); + Record record = new Record(message, metadata); + LocalDateTime localDateTime = record.getLocalDateTime(config); + Assert.assertEquals(LocalDateTime.ofInstant(defaultTimestamp, ZoneId.of("UTC")), localDateTime); + } } diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java index e26f0077a..915c0e2f4 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.HashSet; import java.util.Set; @@ -58,6 +60,7 @@ public void setUp() { MockitoAnnotations.initMocks(this); this.sinkConfig = Mockito.mock(BlobSinkConfig.class); Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); + Mockito.when(sinkConfig.getFilePartitionProcessingTimeEnabled()).thenReturn(false); Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn(""); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.HOUR); @@ -68,6 +71,7 @@ public void setUp() { @Test public void shouldCreateLocalFileWriter() throws Exception { Record record = Mockito.mock(Record.class); + Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now()); Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L)); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test"); @@ -82,6 +86,7 @@ public void shouldCreateLocalFileWriter() throws Exception { @Test public void shouldCreateMultipleWriterBasedOnPartition() throws Exception { Record record1 = Mockito.mock(Record.class); + Mockito.when(record1.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(3600000L), ZoneId.of(zone))); Mockito.when(record1.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); Mockito.when(record1.getTopic("")).thenReturn(defaultTopic); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record1, sinkConfig))).thenReturn(localFileWriter1); @@ -89,6 +94,7 @@ public void shouldCreateMultipleWriterBasedOnPartition() throws Exception { Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test1"); Record record2 = Mockito.mock(Record.class); + Mockito.when(record2.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(7200000L), ZoneId.of(zone))); Mockito.when(record2.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(7200000L)); Mockito.when(record2.getTopic("")).thenReturn(defaultTopic); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record2, sinkConfig))).thenReturn(localFileWriter2); @@ -106,6 +112,7 @@ public void shouldCreateMultipleWriterBasedOnPartition() throws Exception { @Test(expected = IOException.class) public void shouldThrowIOExceptionWhenWriteThrowsException() throws Exception { Record record = Mockito.mock(Record.class); + Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now()); Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getMetadata()).thenReturn(new LocalFileMetadata("/tmp/", "/tmp/test1", 0, 0, 0)); @@ -120,6 +127,7 @@ public void shouldThrowIOExceptionWhenWriteThrowsException() throws Exception { public void shouldThrowIOExceptionWhenOpenNewWriterFailed() throws Exception { expectedException.expect(LocalFileWriterFailedException.class); Record record = Mockito.mock(Record.class); + Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now()); Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getMetadata()).thenReturn(new LocalFileMetadata("/tmp/", "/tmp/test1", 0, 0, 0)); @@ -133,6 +141,7 @@ public void shouldThrowIOExceptionWhenOpenNewWriterFailed() throws Exception { public void shouldGetEmptyFlushedPath() throws Exception { Record record = Mockito.mock(Record.class); Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L)); + Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(1L), ZoneId.of(zone))); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test"); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record, sinkConfig))).thenReturn(localFileWriter1); From 10ac7d40e39d24bc5c31926586b377ecd42718e0 Mon Sep 17 00:00:00 2001 From: lavkesh Date: Fri, 13 Oct 2023 22:57:03 +0800 Subject: [PATCH 2/4] feat: adding message timestamp partition --- .../firehose/config/BlobSinkConfig.java | 7 ++-- .../config/enums/TimePartitionType.java | 8 +++++ .../firehose/sink/blob/message/Record.java | 32 +++++++++++++------ .../sink/blob/message/RecordTest.java | 6 ++-- .../blob/writer/WriterOrchestratorTest.java | 13 ++++---- 5 files changed, 44 insertions(+), 22 deletions(-) create mode 100644 src/main/java/com/gotocompany/firehose/config/enums/TimePartitionType.java diff --git a/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java b/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java index da9d94bf5..34288e274 100644 --- a/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java @@ -3,6 +3,7 @@ import com.gotocompany.firehose.config.converter.BlobSinkFilePartitionTypeConverter; import com.gotocompany.firehose.config.converter.BlobSinkLocalFileWriterTypeConverter; import com.gotocompany.firehose.config.converter.BlobStorageTypeConverter; +import com.gotocompany.firehose.config.enums.TimePartitionType; import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageType; import com.gotocompany.firehose.sink.blob.Constants; @@ -49,9 +50,9 @@ public interface BlobSinkConfig extends AppConfig { @Key("SINK_BLOB_FILE_PARTITION_PROTO_TIMESTAMP_FIELD_NAME") String getFilePartitionProtoTimestampFieldName(); - @Key("SINK_BLOB_FILE_PARTITION_PROCESSING_TIME_ENABLED") - @DefaultValue("false") - boolean getFilePartitionProcessingTimeEnabled(); + @Key("SINK_BLOB_FILE_PARTITION_TIME_TYPE") + @DefaultValue("EVENT_TIMESTAMP") + TimePartitionType getFilePartitionTimeType(); @Key("SINK_BLOB_FILE_PARTITION_TIME_GRANULARITY_TYPE") @DefaultValue("day") diff --git a/src/main/java/com/gotocompany/firehose/config/enums/TimePartitionType.java b/src/main/java/com/gotocompany/firehose/config/enums/TimePartitionType.java new file mode 100644 index 000000000..87c9ece00 --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/config/enums/TimePartitionType.java @@ -0,0 +1,8 @@ +package com.gotocompany.firehose.config.enums; + +public enum TimePartitionType { + MESSAGE_TIMESTAMP, + PROCESSING_TIMESTAMP, + EVENT_TIMESTAMP + +} diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java b/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java index 7f39fb1fd..10b118f59 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java @@ -29,22 +29,36 @@ public String getTopic(String fieldName) { return (String) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TOPIC_FIELD_NAME)); } - public Instant getTimestamp(String fieldName) { - Descriptors.Descriptor descriptor = message.getDescriptorForType(); + public Instant getTimestampFromMessage(String fieldName) { + return getTimeStampFromDescriptor(fieldName, message); + } + + public Instant getTimestampFromMetadata(String fieldName) { + return getTimeStampFromDescriptor(fieldName, metadata); + } + + public Instant getTimeStampFromDescriptor(String fieldName, DynamicMessage m) { + Descriptors.Descriptor descriptor = m.getDescriptorForType(); Descriptors.FieldDescriptor timestampField = descriptor.findFieldByName(fieldName); - DynamicMessage timestamp = (DynamicMessage) message.getField(timestampField); + DynamicMessage timestamp = (DynamicMessage) m.getField(timestampField); long seconds = (long) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("seconds")); int nanos = (int) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("nanos")); return Instant.ofEpochSecond(seconds, nanos); } public LocalDateTime getLocalDateTime(BlobSinkConfig config) { - if (config.getFilePartitionProcessingTimeEnabled()) { - return LocalDateTime.now(); - } else { - return LocalDateTime.ofInstant( - getTimestamp(config.getFilePartitionProtoTimestampFieldName()), - ZoneId.of(config.getFilePartitionProtoTimestampTimezone())); + switch (config.getFilePartitionTimeType()) { + case MESSAGE_TIMESTAMP: + return LocalDateTime.ofInstant( + getTimestampFromMetadata(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME), + ZoneId.of(config.getFilePartitionProtoTimestampTimezone())); + case PROCESSING_TIMESTAMP: + return LocalDateTime.now(); + default: + return LocalDateTime.ofInstant( + getTimestampFromMessage(config.getFilePartitionProtoTimestampFieldName()), + ZoneId.of(config.getFilePartitionProtoTimestampTimezone())); + } } } diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java index c7c2504eb..d2bd958a1 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java @@ -2,6 +2,7 @@ import com.google.protobuf.DynamicMessage; import com.gotocompany.firehose.config.BlobSinkConfig; +import com.gotocompany.firehose.config.enums.TimePartitionType; import com.gotocompany.firehose.sink.blob.TestUtils; import org.junit.Assert; import org.junit.Test; @@ -41,13 +42,13 @@ public void shouldGetTimeStampFromMessage() { DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic); Record record = new Record(message, metadata); - Assert.assertEquals(defaultTimestamp, record.getTimestamp("created_time")); + Assert.assertEquals(defaultTimestamp, record.getTimestampFromMessage("created_time")); } @Test public void shouldGetDateTimeLocally() throws InterruptedException { BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class); - Mockito.when(config.getFilePartitionProcessingTimeEnabled()).thenReturn(true); + Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.PROCESSING_TIMESTAMP); DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic); Record record = new Record(message, metadata); @@ -63,7 +64,6 @@ public void shouldGetDateTimeLocally() throws InterruptedException { @Test public void shouldGetDateTimeFromMessage() throws InterruptedException { BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class); - Mockito.when(config.getFilePartitionProcessingTimeEnabled()).thenReturn(false); Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time"); Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC"); DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java index 915c0e2f4..934a6edbd 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java @@ -60,7 +60,6 @@ public void setUp() { MockitoAnnotations.initMocks(this); this.sinkConfig = Mockito.mock(BlobSinkConfig.class); Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); - Mockito.when(sinkConfig.getFilePartitionProcessingTimeEnabled()).thenReturn(false); Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn(""); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.HOUR); @@ -72,7 +71,7 @@ public void setUp() { public void shouldCreateLocalFileWriter() throws Exception { Record record = Mockito.mock(Record.class); Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now()); - Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L)); + Mockito.when(record.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L)); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test"); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record, sinkConfig))).thenReturn(localFileWriter1); @@ -87,7 +86,7 @@ public void shouldCreateLocalFileWriter() throws Exception { public void shouldCreateMultipleWriterBasedOnPartition() throws Exception { Record record1 = Mockito.mock(Record.class); Mockito.when(record1.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(3600000L), ZoneId.of(zone))); - Mockito.when(record1.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); + Mockito.when(record1.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); Mockito.when(record1.getTopic("")).thenReturn(defaultTopic); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record1, sinkConfig))).thenReturn(localFileWriter1); Mockito.when(localFileWriter1.write(record1)).thenReturn(true); @@ -95,7 +94,7 @@ public void shouldCreateMultipleWriterBasedOnPartition() throws Exception { Record record2 = Mockito.mock(Record.class); Mockito.when(record2.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(7200000L), ZoneId.of(zone))); - Mockito.when(record2.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(7200000L)); + Mockito.when(record2.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(7200000L)); Mockito.when(record2.getTopic("")).thenReturn(defaultTopic); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record2, sinkConfig))).thenReturn(localFileWriter2); Mockito.when(localFileWriter2.write(record2)).thenReturn(true); @@ -113,7 +112,7 @@ public void shouldCreateMultipleWriterBasedOnPartition() throws Exception { public void shouldThrowIOExceptionWhenWriteThrowsException() throws Exception { Record record = Mockito.mock(Record.class); Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now()); - Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); + Mockito.when(record.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getMetadata()).thenReturn(new LocalFileMetadata("/tmp/", "/tmp/test1", 0, 0, 0)); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record, sinkConfig))).thenReturn(localFileWriter1); @@ -128,7 +127,7 @@ public void shouldThrowIOExceptionWhenOpenNewWriterFailed() throws Exception { expectedException.expect(LocalFileWriterFailedException.class); Record record = Mockito.mock(Record.class); Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now()); - Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); + Mockito.when(record.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getMetadata()).thenReturn(new LocalFileMetadata("/tmp/", "/tmp/test1", 0, 0, 0)); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record, sinkConfig))).thenThrow(new LocalFileWriterFailedException(new IOException("Some error"))); @@ -140,7 +139,7 @@ public void shouldThrowIOExceptionWhenOpenNewWriterFailed() throws Exception { @Test public void shouldGetEmptyFlushedPath() throws Exception { Record record = Mockito.mock(Record.class); - Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L)); + Mockito.when(record.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L)); Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(1L), ZoneId.of(zone))); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test"); From 16312305346f2a2b2f153267f0781d54c9592980 Mon Sep 17 00:00:00 2001 From: lavkesh Date: Mon, 16 Oct 2023 00:24:26 +0800 Subject: [PATCH 3/4] test: add unit tests --- .../firehose/sink/blob/message/Record.java | 23 ++++++++++++++----- .../sink/blob/message/RecordTest.java | 15 ++++++++++++ .../local/TimePartitionedPathUtilsTest.java | 5 ++++ 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java b/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java index 10b118f59..75c166042 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java @@ -29,12 +29,23 @@ public String getTopic(String fieldName) { return (String) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TOPIC_FIELD_NAME)); } - public Instant getTimestampFromMessage(String fieldName) { - return getTimeStampFromDescriptor(fieldName, message); + public Instant getMessageTimeStamp(String metadataColumnName) { + Descriptors.Descriptor metadataDescriptor = metadata.getDescriptorForType(); + com.google.protobuf.Timestamp timestamp; + if (!metadataColumnName.isEmpty()) { + DynamicMessage nestedMetadataMessage = (DynamicMessage) metadata.getField(metadataDescriptor.findFieldByName(metadataColumnName)); + Descriptors.Descriptor nestedMetadataMessageDescriptor = nestedMetadataMessage.getDescriptorForType(); + timestamp = (com.google.protobuf.Timestamp) nestedMetadataMessage.getField(nestedMetadataMessageDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME)); + } else { + timestamp = (com.google.protobuf.Timestamp) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME)); + } + long seconds = (long) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("seconds")); + int nanos = (int) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("nanos")); + return Instant.ofEpochSecond(seconds, nanos); } - public Instant getTimestampFromMetadata(String fieldName) { - return getTimeStampFromDescriptor(fieldName, metadata); + public Instant getTimestampFromMessage(String fieldName) { + return getTimeStampFromDescriptor(fieldName, message); } public Instant getTimeStampFromDescriptor(String fieldName, DynamicMessage m) { @@ -50,8 +61,8 @@ public LocalDateTime getLocalDateTime(BlobSinkConfig config) { switch (config.getFilePartitionTimeType()) { case MESSAGE_TIMESTAMP: return LocalDateTime.ofInstant( - getTimestampFromMetadata(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME), - ZoneId.of(config.getFilePartitionProtoTimestampTimezone())); + getMessageTimeStamp(config.getOutputKafkaMetadataColumnName()), + ZoneId.of("UTC")); case PROCESSING_TIMESTAMP: return LocalDateTime.now(); default: diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java index d2bd958a1..e0d449f91 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java @@ -15,6 +15,7 @@ public class RecordTest { private final Instant defaultTimestamp = Instant.parse("2020-01-01T10:00:00.000Z"); + private final Instant messageTimeStamp = Instant.parse("2020-01-02T10:00:00.000Z"); private final int defaultOrderNumber = 100; private final long defaultOffset = 1L; private final int defaultPartition = 1; @@ -64,6 +65,7 @@ public void shouldGetDateTimeLocally() throws InterruptedException { @Test public void shouldGetDateTimeFromMessage() throws InterruptedException { BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class); + Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time"); Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC"); DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); @@ -72,4 +74,17 @@ public void shouldGetDateTimeFromMessage() throws InterruptedException { LocalDateTime localDateTime = record.getLocalDateTime(config); Assert.assertEquals(LocalDateTime.ofInstant(defaultTimestamp, ZoneId.of("UTC")), localDateTime); } + @Test + public void shouldGetDateTimeFromKafkaMessage() throws InterruptedException { + BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class); + Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.MESSAGE_TIMESTAMP); + Mockito.when(config.getOutputKafkaMetadataColumnName()).thenReturn("nested_field"); + Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time"); + Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC"); + DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); + DynamicMessage metadata = TestUtils.createMetadata("nested_field", messageTimeStamp, defaultOffset, defaultPartition, defaultTopic); + Record record = new Record(message, metadata); + LocalDateTime localDateTime = record.getLocalDateTime(config); + Assert.assertEquals(LocalDateTime.ofInstant(messageTimeStamp, ZoneId.of("UTC")), localDateTime); + } } diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/TimePartitionedPathUtilsTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/TimePartitionedPathUtilsTest.java index b67359b4c..8771123e3 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/TimePartitionedPathUtilsTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/TimePartitionedPathUtilsTest.java @@ -2,6 +2,7 @@ import com.google.protobuf.DynamicMessage; import com.gotocompany.firehose.config.BlobSinkConfig; +import com.gotocompany.firehose.config.enums.TimePartitionType; import com.gotocompany.firehose.sink.blob.Constants; import com.gotocompany.firehose.sink.blob.TestProtoMessage; import com.gotocompany.firehose.sink.blob.TestUtils; @@ -41,6 +42,7 @@ public void shouldCreateDayPartitioningPath() { Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY); + Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn(kafkaMetadataFieldName); Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn("date="); Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn(""); @@ -58,6 +60,7 @@ public void shouldCreateHourPartitioningPath() { Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn(kafkaMetadataFieldName); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); + Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.HOUR); Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix); Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn(hourPrefix); @@ -91,6 +94,7 @@ public void shouldCreatePartitionPathWhenKafkaMetadataIsNotNested() { BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class); Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY); + Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix); Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn(hourPrefix); @@ -107,6 +111,7 @@ public void shouldCreatePartitioningPathForNestedKafkaMetadata() { Record record = new Record(message, metadata); BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class); Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); + Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix); From fcc8443584824d0e3e82f6c97d34a57d17a1b139 Mon Sep 17 00:00:00 2001 From: lavkesh Date: Tue, 7 Nov 2023 15:24:00 +0800 Subject: [PATCH 4/4] feat: add global policy for file rotation --- .../gotocompany/firehose/config/BlobSinkConfig.java | 5 +++++ .../firehose/sink/blob/BlobSinkFactory.java | 4 ++++ .../sink/blob/writer/local/LocalFileChecker.java | 13 ++++++++++--- .../sink/blob/writer/local/LocalStorage.java | 12 +++++++++++- .../writer/local/policy/GlobalWriterPolicy.java | 10 ++++++++++ .../local/policy/SizeBasedRotatingPolicy.java | 10 +++++++++- .../sink/blob/writer/local/LocalStorageTest.java | 4 +++- 7 files changed, 52 insertions(+), 6 deletions(-) create mode 100644 src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/GlobalWriterPolicy.java diff --git a/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java b/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java index 34288e274..e29dd5a17 100644 --- a/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java @@ -50,6 +50,11 @@ public interface BlobSinkConfig extends AppConfig { @Key("SINK_BLOB_FILE_PARTITION_PROTO_TIMESTAMP_FIELD_NAME") String getFilePartitionProtoTimestampFieldName(); + @Key("SINK_BLOB_GLOBAL_FILE_ROTATION_MAX_SIZE_BYTES") + @DefaultValue("268435456") + long getGlobalFileRotationMaxSizeBytes(); + + @Key("SINK_BLOB_FILE_PARTITION_TIME_TYPE") @DefaultValue("EVENT_TIMESTAMP") TimePartitionType getFilePartitionTimeType(); diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java b/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java index 0f52b7392..502f8f811 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java @@ -7,6 +7,7 @@ import com.gotocompany.firehose.sink.blob.message.MessageDeSerializer; import com.gotocompany.firehose.sink.blob.writer.WriterOrchestrator; import com.gotocompany.firehose.sink.blob.writer.local.LocalStorage; +import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.SizeBasedRotatingPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.TimeBasedRotatingPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy; @@ -53,13 +54,16 @@ private static LocalStorage getLocalFileWriterWrapper(BlobSinkConfig sinkConfig, Descriptors.Descriptor outputMessageDescriptor = stencilClient.get(sinkConfig.getInputSchemaProtoClass()); Descriptors.Descriptor metadataMessageDescriptor = getMetadataMessageDescriptor(sinkConfig); List writerPolicies = new ArrayList<>(); + List globalWriterPolicies = new ArrayList<>(); writerPolicies.add(new TimeBasedRotatingPolicy(sinkConfig.getLocalFileRotationDurationMS())); writerPolicies.add(new SizeBasedRotatingPolicy(sinkConfig.getLocalFileRotationMaxSizeBytes())); + globalWriterPolicies.add(new SizeBasedRotatingPolicy(sinkConfig.getGlobalFileRotationMaxSizeBytes())); return new LocalStorage( sinkConfig, outputMessageDescriptor, metadataMessageDescriptor.getFields(), writerPolicies, + globalWriterPolicies, new FirehoseInstrumentation(statsDReporter, LocalStorage.class)); } diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalFileChecker.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalFileChecker.java index ddefbf81a..a0a8498b6 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalFileChecker.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalFileChecker.java @@ -31,9 +31,16 @@ public LocalFileChecker(Queue toBeFlushedToRemotePaths, @Override public void run() { firehoseInstrumentation.captureValue(BlobStorageMetrics.LOCAL_FILE_OPEN_TOTAL, timePartitionWriterMap.size()); - Map toBeRotated = - timePartitionWriterMap.entrySet().stream().filter(kv -> localStorage.shouldRotate(kv.getValue())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map toBeRotated; + if (localStorage.shouldRotate(timePartitionWriterMap.values())) { + // rotate all + toBeRotated = timePartitionWriterMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } else { + toBeRotated = + timePartitionWriterMap.entrySet().stream().filter(kv -> localStorage.shouldRotate(kv.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } timePartitionWriterMap.entrySet().removeAll(toBeRotated.entrySet()); toBeRotated.forEach((path, writer) -> { try { diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorage.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorage.java index 2552df33f..dff653bee 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorage.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorage.java @@ -4,6 +4,7 @@ import com.gotocompany.firehose.config.BlobSinkConfig; import com.gotocompany.firehose.exception.ConfigurationException; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; +import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy; import lombok.AllArgsConstructor; @@ -11,8 +12,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Collection; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; @AllArgsConstructor public class LocalStorage { @@ -21,6 +24,7 @@ public class LocalStorage { private final Descriptors.Descriptor messageDescriptor; private final List metadataFieldDescriptor; private final List policies; + private final List globalPolicies; private final FirehoseInstrumentation firehoseInstrumentation; public LocalFileWriter createLocalFileWriter(Path partitionPath) { @@ -31,7 +35,7 @@ public LocalFileWriter createLocalFileWriter(Path partitionPath) { return createWriter(basePath, fullPath); } - private LocalParquetFileWriter createWriter(Path basePath, Path fullPath) { + private LocalFileWriter createWriter(Path basePath, Path fullPath) { switch (sinkConfig.getLocalFileWriterType()) { case PARQUET: try { @@ -78,4 +82,10 @@ public void deleteLocalFile(Path... paths) throws IOException { public Boolean shouldRotate(LocalFileWriter writer) { return policies.stream().anyMatch(writerPolicy -> writerPolicy.shouldRotate(writer.getMetadata())); } + + public Boolean shouldRotate(Collection writers) { + return globalPolicies.stream().anyMatch(policy -> policy.shouldRotate( + writers.stream().map(LocalFileWriter::getMetadata).collect(Collectors.toList()) + )); + } } diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/GlobalWriterPolicy.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/GlobalWriterPolicy.java new file mode 100644 index 000000000..f58f5751f --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/GlobalWriterPolicy.java @@ -0,0 +1,10 @@ +package com.gotocompany.firehose.sink.blob.writer.local.policy; + +import com.gotocompany.firehose.sink.blob.writer.local.LocalFileMetadata; + +import java.util.List; + +public interface GlobalWriterPolicy { + + boolean shouldRotate(List metadata); +} diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/SizeBasedRotatingPolicy.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/SizeBasedRotatingPolicy.java index 289d91e64..f3a5938d8 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/SizeBasedRotatingPolicy.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/SizeBasedRotatingPolicy.java @@ -2,7 +2,9 @@ import com.gotocompany.firehose.sink.blob.writer.local.LocalFileMetadata; -public class SizeBasedRotatingPolicy implements WriterPolicy { +import java.util.List; + +public class SizeBasedRotatingPolicy implements WriterPolicy, GlobalWriterPolicy { private final long maxSize; @@ -17,4 +19,10 @@ public SizeBasedRotatingPolicy(long maxSize) { public boolean shouldRotate(LocalFileMetadata metadata) { return metadata.getSize() >= maxSize; } + + @Override + public boolean shouldRotate(List metadataList) { + long totalSize = metadataList.stream().map(LocalFileMetadata::getSize).reduce(0L, Long::sum); + return totalSize >= maxSize; + } } diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorageTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorageTest.java index 411089ffa..87bbb725d 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorageTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorageTest.java @@ -5,6 +5,7 @@ import com.gotocompany.firehose.config.BlobSinkConfig; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; import com.gotocompany.firehose.sink.blob.Constants; +import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy; import org.junit.Test; import org.mockito.Mockito; @@ -20,8 +21,9 @@ public void shouldDeleteFiles() throws Exception { BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class); List metadataFieldDescriptor = new ArrayList<>(); List policies = new ArrayList<>(); + List globalWriterPolicies = new ArrayList<>(); FirehoseInstrumentation firehoseInstrumentation = Mockito.mock(FirehoseInstrumentation.class); - LocalStorage storage = new LocalStorage(sinkConfig, null, metadataFieldDescriptor, policies, firehoseInstrumentation); + LocalStorage storage = new LocalStorage(sinkConfig, null, metadataFieldDescriptor, policies, globalWriterPolicies, firehoseInstrumentation); LocalStorage spy = Mockito.spy(storage); Mockito.doNothing().when(spy).deleteLocalFile(Paths.get("/tmp/a"), Paths.get("/tmp/.a.crc")); Mockito.when(sinkConfig.getLocalFileWriterType()).thenReturn(Constants.WriterType.PARQUET);