diff --git a/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java b/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java index 0e01a73f0..e29dd5a17 100644 --- a/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java @@ -3,6 +3,7 @@ import com.gotocompany.firehose.config.converter.BlobSinkFilePartitionTypeConverter; import com.gotocompany.firehose.config.converter.BlobSinkLocalFileWriterTypeConverter; import com.gotocompany.firehose.config.converter.BlobStorageTypeConverter; +import com.gotocompany.firehose.config.enums.TimePartitionType; import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageType; import com.gotocompany.firehose.sink.blob.Constants; @@ -49,6 +50,15 @@ public interface BlobSinkConfig extends AppConfig { @Key("SINK_BLOB_FILE_PARTITION_PROTO_TIMESTAMP_FIELD_NAME") String getFilePartitionProtoTimestampFieldName(); + @Key("SINK_BLOB_GLOBAL_FILE_ROTATION_MAX_SIZE_BYTES") + @DefaultValue("268435456") + long getGlobalFileRotationMaxSizeBytes(); + + + @Key("SINK_BLOB_FILE_PARTITION_TIME_TYPE") + @DefaultValue("EVENT_TIMESTAMP") + TimePartitionType getFilePartitionTimeType(); + @Key("SINK_BLOB_FILE_PARTITION_TIME_GRANULARITY_TYPE") @DefaultValue("day") @ConverterClass(BlobSinkFilePartitionTypeConverter.class) diff --git a/src/main/java/com/gotocompany/firehose/config/enums/TimePartitionType.java b/src/main/java/com/gotocompany/firehose/config/enums/TimePartitionType.java new file mode 100644 index 000000000..87c9ece00 --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/config/enums/TimePartitionType.java @@ -0,0 +1,8 @@ +package com.gotocompany.firehose.config.enums; + +public enum TimePartitionType { + MESSAGE_TIMESTAMP, + PROCESSING_TIMESTAMP, + EVENT_TIMESTAMP + +} diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java b/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java index 0f52b7392..502f8f811 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java @@ -7,6 +7,7 @@ import com.gotocompany.firehose.sink.blob.message.MessageDeSerializer; import com.gotocompany.firehose.sink.blob.writer.WriterOrchestrator; import com.gotocompany.firehose.sink.blob.writer.local.LocalStorage; +import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.SizeBasedRotatingPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.TimeBasedRotatingPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy; @@ -53,13 +54,16 @@ private static LocalStorage getLocalFileWriterWrapper(BlobSinkConfig sinkConfig, Descriptors.Descriptor outputMessageDescriptor = stencilClient.get(sinkConfig.getInputSchemaProtoClass()); Descriptors.Descriptor metadataMessageDescriptor = getMetadataMessageDescriptor(sinkConfig); List writerPolicies = new ArrayList<>(); + List globalWriterPolicies = new ArrayList<>(); writerPolicies.add(new TimeBasedRotatingPolicy(sinkConfig.getLocalFileRotationDurationMS())); writerPolicies.add(new SizeBasedRotatingPolicy(sinkConfig.getLocalFileRotationMaxSizeBytes())); + globalWriterPolicies.add(new SizeBasedRotatingPolicy(sinkConfig.getGlobalFileRotationMaxSizeBytes())); return new LocalStorage( sinkConfig, outputMessageDescriptor, metadataMessageDescriptor.getFields(), writerPolicies, + globalWriterPolicies, new FirehoseInstrumentation(statsDReporter, LocalStorage.class)); } diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java b/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java index 1bec503de..75c166042 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java @@ -2,11 +2,14 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; +import com.gotocompany.firehose.config.BlobSinkConfig; import com.gotocompany.firehose.sink.blob.proto.KafkaMetadataProtoMessage; import lombok.AllArgsConstructor; import lombok.Data; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; @AllArgsConstructor @Data @@ -26,12 +29,47 @@ public String getTopic(String fieldName) { return (String) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TOPIC_FIELD_NAME)); } - public Instant getTimestamp(String fieldName) { - Descriptors.Descriptor descriptor = message.getDescriptorForType(); + public Instant getMessageTimeStamp(String metadataColumnName) { + Descriptors.Descriptor metadataDescriptor = metadata.getDescriptorForType(); + com.google.protobuf.Timestamp timestamp; + if (!metadataColumnName.isEmpty()) { + DynamicMessage nestedMetadataMessage = (DynamicMessage) metadata.getField(metadataDescriptor.findFieldByName(metadataColumnName)); + Descriptors.Descriptor nestedMetadataMessageDescriptor = nestedMetadataMessage.getDescriptorForType(); + timestamp = (com.google.protobuf.Timestamp) nestedMetadataMessage.getField(nestedMetadataMessageDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME)); + } else { + timestamp = (com.google.protobuf.Timestamp) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME)); + } + long seconds = (long) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("seconds")); + int nanos = (int) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("nanos")); + return Instant.ofEpochSecond(seconds, nanos); + } + + public Instant getTimestampFromMessage(String fieldName) { + return getTimeStampFromDescriptor(fieldName, message); + } + + public Instant getTimeStampFromDescriptor(String fieldName, DynamicMessage m) { + Descriptors.Descriptor descriptor = m.getDescriptorForType(); Descriptors.FieldDescriptor timestampField = descriptor.findFieldByName(fieldName); - DynamicMessage timestamp = (DynamicMessage) message.getField(timestampField); + DynamicMessage timestamp = (DynamicMessage) m.getField(timestampField); long seconds = (long) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("seconds")); int nanos = (int) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("nanos")); return Instant.ofEpochSecond(seconds, nanos); } + + public LocalDateTime getLocalDateTime(BlobSinkConfig config) { + switch (config.getFilePartitionTimeType()) { + case MESSAGE_TIMESTAMP: + return LocalDateTime.ofInstant( + getMessageTimeStamp(config.getOutputKafkaMetadataColumnName()), + ZoneId.of("UTC")); + case PROCESSING_TIMESTAMP: + return LocalDateTime.now(); + default: + return LocalDateTime.ofInstant( + getTimestampFromMessage(config.getFilePartitionProtoTimestampFieldName()), + ZoneId.of(config.getFilePartitionProtoTimestampTimezone())); + + } + } } diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalFileChecker.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalFileChecker.java index ddefbf81a..a0a8498b6 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalFileChecker.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalFileChecker.java @@ -31,9 +31,16 @@ public LocalFileChecker(Queue toBeFlushedToRemotePaths, @Override public void run() { firehoseInstrumentation.captureValue(BlobStorageMetrics.LOCAL_FILE_OPEN_TOTAL, timePartitionWriterMap.size()); - Map toBeRotated = - timePartitionWriterMap.entrySet().stream().filter(kv -> localStorage.shouldRotate(kv.getValue())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map toBeRotated; + if (localStorage.shouldRotate(timePartitionWriterMap.values())) { + // rotate all + toBeRotated = timePartitionWriterMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } else { + toBeRotated = + timePartitionWriterMap.entrySet().stream().filter(kv -> localStorage.shouldRotate(kv.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } timePartitionWriterMap.entrySet().removeAll(toBeRotated.entrySet()); toBeRotated.forEach((path, writer) -> { try { diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorage.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorage.java index 2552df33f..dff653bee 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorage.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorage.java @@ -4,6 +4,7 @@ import com.gotocompany.firehose.config.BlobSinkConfig; import com.gotocompany.firehose.exception.ConfigurationException; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; +import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy; import lombok.AllArgsConstructor; @@ -11,8 +12,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Collection; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; @AllArgsConstructor public class LocalStorage { @@ -21,6 +24,7 @@ public class LocalStorage { private final Descriptors.Descriptor messageDescriptor; private final List metadataFieldDescriptor; private final List policies; + private final List globalPolicies; private final FirehoseInstrumentation firehoseInstrumentation; public LocalFileWriter createLocalFileWriter(Path partitionPath) { @@ -31,7 +35,7 @@ public LocalFileWriter createLocalFileWriter(Path partitionPath) { return createWriter(basePath, fullPath); } - private LocalParquetFileWriter createWriter(Path basePath, Path fullPath) { + private LocalFileWriter createWriter(Path basePath, Path fullPath) { switch (sinkConfig.getLocalFileWriterType()) { case PARQUET: try { @@ -78,4 +82,10 @@ public void deleteLocalFile(Path... paths) throws IOException { public Boolean shouldRotate(LocalFileWriter writer) { return policies.stream().anyMatch(writerPolicy -> writerPolicy.shouldRotate(writer.getMetadata())); } + + public Boolean shouldRotate(Collection writers) { + return globalPolicies.stream().anyMatch(policy -> policy.shouldRotate( + writers.stream().map(LocalFileWriter::getMetadata).collect(Collectors.toList()) + )); + } } diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/path/TimePartitionedPathUtils.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/path/TimePartitionedPathUtils.java index 97e242419..5c4bd6d8e 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/path/TimePartitionedPathUtils.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/path/TimePartitionedPathUtils.java @@ -7,11 +7,7 @@ import java.nio.file.Path; import java.nio.file.Paths; -import java.time.Instant; -import java.time.LocalDate; import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.ZoneId; import java.time.format.DateTimeFormatter; /** @@ -25,14 +21,13 @@ public class TimePartitionedPathUtils { public static Path getTimePartitionedPath(Record record, BlobSinkConfig sinkConfig) { String topic = record.getTopic(sinkConfig.getOutputKafkaMetadataColumnName()); - Instant timestamp = record.getTimestamp(sinkConfig.getFilePartitionProtoTimestampFieldName()); + Path path = Paths.get(topic); if (sinkConfig.getFilePartitionTimeGranularityType() == Constants.FilePartitionType.NONE) { - return Paths.get(topic); + return path; } - LocalDate localDate = LocalDateTime.ofInstant(timestamp, ZoneId.of(sinkConfig.getFilePartitionProtoTimestampTimezone())).toLocalDate(); - String datePart = DATE_FORMATTER.format(localDate); - LocalTime localTime = LocalDateTime.ofInstant(timestamp, ZoneId.of(sinkConfig.getFilePartitionProtoTimestampTimezone())).toLocalTime(); - String hourPart = HOUR_FORMATTER.format(localTime); + LocalDateTime dateTime = record.getLocalDateTime(sinkConfig); + String datePart = DATE_FORMATTER.format(dateTime.toLocalDate()); + String hourPart = HOUR_FORMATTER.format(dateTime.toLocalTime()); String dateSegment = String.format("%s%s", sinkConfig.getFilePartitionTimeDatePrefix(), datePart); String hourSegment = String.format("%s%s", sinkConfig.getFilePartitionTimeHourPrefix(), hourPart); @@ -40,7 +35,7 @@ public static Path getTimePartitionedPath(Record record, BlobSinkConfig sinkConf String dateTimePartition; switch (sinkConfig.getFilePartitionTimeGranularityType()) { case NONE: - return Paths.get(topic); + return path; case DAY: dateTimePartition = String.format("%s", dateSegment); break; diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/GlobalWriterPolicy.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/GlobalWriterPolicy.java new file mode 100644 index 000000000..f58f5751f --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/GlobalWriterPolicy.java @@ -0,0 +1,10 @@ +package com.gotocompany.firehose.sink.blob.writer.local.policy; + +import com.gotocompany.firehose.sink.blob.writer.local.LocalFileMetadata; + +import java.util.List; + +public interface GlobalWriterPolicy { + + boolean shouldRotate(List metadata); +} diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/SizeBasedRotatingPolicy.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/SizeBasedRotatingPolicy.java index 289d91e64..f3a5938d8 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/SizeBasedRotatingPolicy.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/SizeBasedRotatingPolicy.java @@ -2,7 +2,9 @@ import com.gotocompany.firehose.sink.blob.writer.local.LocalFileMetadata; -public class SizeBasedRotatingPolicy implements WriterPolicy { +import java.util.List; + +public class SizeBasedRotatingPolicy implements WriterPolicy, GlobalWriterPolicy { private final long maxSize; @@ -17,4 +19,10 @@ public SizeBasedRotatingPolicy(long maxSize) { public boolean shouldRotate(LocalFileMetadata metadata) { return metadata.getSize() >= maxSize; } + + @Override + public boolean shouldRotate(List metadataList) { + long totalSize = metadataList.stream().map(LocalFileMetadata::getSize).reduce(0L, Long::sum); + return totalSize >= maxSize; + } } diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java index e997729e9..e0d449f91 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java @@ -1,15 +1,21 @@ package com.gotocompany.firehose.sink.blob.message; import com.google.protobuf.DynamicMessage; +import com.gotocompany.firehose.config.BlobSinkConfig; +import com.gotocompany.firehose.config.enums.TimePartitionType; import com.gotocompany.firehose.sink.blob.TestUtils; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; public class RecordTest { private final Instant defaultTimestamp = Instant.parse("2020-01-01T10:00:00.000Z"); + private final Instant messageTimeStamp = Instant.parse("2020-01-02T10:00:00.000Z"); private final int defaultOrderNumber = 100; private final long defaultOffset = 1L; private final int defaultPartition = 1; @@ -37,6 +43,48 @@ public void shouldGetTimeStampFromMessage() { DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic); Record record = new Record(message, metadata); - Assert.assertEquals(defaultTimestamp, record.getTimestamp("created_time")); + Assert.assertEquals(defaultTimestamp, record.getTimestampFromMessage("created_time")); + } + + @Test + public void shouldGetDateTimeLocally() throws InterruptedException { + BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class); + Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.PROCESSING_TIMESTAMP); + DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); + DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic); + Record record = new Record(message, metadata); + LocalDateTime before = LocalDateTime.now(); + Thread.sleep(1000); + LocalDateTime localDateTime = record.getLocalDateTime(config); + Thread.sleep(1000); + LocalDateTime after = LocalDateTime.now(); + Assert.assertTrue(localDateTime.isAfter(before)); + Assert.assertTrue(localDateTime.isBefore(after)); + } + + @Test + public void shouldGetDateTimeFromMessage() throws InterruptedException { + BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class); + Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); + Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time"); + Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC"); + DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); + DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic); + Record record = new Record(message, metadata); + LocalDateTime localDateTime = record.getLocalDateTime(config); + Assert.assertEquals(LocalDateTime.ofInstant(defaultTimestamp, ZoneId.of("UTC")), localDateTime); + } + @Test + public void shouldGetDateTimeFromKafkaMessage() throws InterruptedException { + BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class); + Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.MESSAGE_TIMESTAMP); + Mockito.when(config.getOutputKafkaMetadataColumnName()).thenReturn("nested_field"); + Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time"); + Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC"); + DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); + DynamicMessage metadata = TestUtils.createMetadata("nested_field", messageTimeStamp, defaultOffset, defaultPartition, defaultTopic); + Record record = new Record(message, metadata); + LocalDateTime localDateTime = record.getLocalDateTime(config); + Assert.assertEquals(LocalDateTime.ofInstant(messageTimeStamp, ZoneId.of("UTC")), localDateTime); } } diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java index e26f0077a..934a6edbd 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.HashSet; import java.util.Set; @@ -68,7 +70,8 @@ public void setUp() { @Test public void shouldCreateLocalFileWriter() throws Exception { Record record = Mockito.mock(Record.class); - Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L)); + Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now()); + Mockito.when(record.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L)); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test"); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record, sinkConfig))).thenReturn(localFileWriter1); @@ -82,14 +85,16 @@ public void shouldCreateLocalFileWriter() throws Exception { @Test public void shouldCreateMultipleWriterBasedOnPartition() throws Exception { Record record1 = Mockito.mock(Record.class); - Mockito.when(record1.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); + Mockito.when(record1.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(3600000L), ZoneId.of(zone))); + Mockito.when(record1.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); Mockito.when(record1.getTopic("")).thenReturn(defaultTopic); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record1, sinkConfig))).thenReturn(localFileWriter1); Mockito.when(localFileWriter1.write(record1)).thenReturn(true); Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test1"); Record record2 = Mockito.mock(Record.class); - Mockito.when(record2.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(7200000L)); + Mockito.when(record2.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(7200000L), ZoneId.of(zone))); + Mockito.when(record2.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(7200000L)); Mockito.when(record2.getTopic("")).thenReturn(defaultTopic); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record2, sinkConfig))).thenReturn(localFileWriter2); Mockito.when(localFileWriter2.write(record2)).thenReturn(true); @@ -106,7 +111,8 @@ public void shouldCreateMultipleWriterBasedOnPartition() throws Exception { @Test(expected = IOException.class) public void shouldThrowIOExceptionWhenWriteThrowsException() throws Exception { Record record = Mockito.mock(Record.class); - Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); + Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now()); + Mockito.when(record.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getMetadata()).thenReturn(new LocalFileMetadata("/tmp/", "/tmp/test1", 0, 0, 0)); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record, sinkConfig))).thenReturn(localFileWriter1); @@ -120,7 +126,8 @@ public void shouldThrowIOExceptionWhenWriteThrowsException() throws Exception { public void shouldThrowIOExceptionWhenOpenNewWriterFailed() throws Exception { expectedException.expect(LocalFileWriterFailedException.class); Record record = Mockito.mock(Record.class); - Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); + Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now()); + Mockito.when(record.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getMetadata()).thenReturn(new LocalFileMetadata("/tmp/", "/tmp/test1", 0, 0, 0)); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record, sinkConfig))).thenThrow(new LocalFileWriterFailedException(new IOException("Some error"))); @@ -132,7 +139,8 @@ public void shouldThrowIOExceptionWhenOpenNewWriterFailed() throws Exception { @Test public void shouldGetEmptyFlushedPath() throws Exception { Record record = Mockito.mock(Record.class); - Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L)); + Mockito.when(record.getTimestampFromMessage(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L)); + Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(1L), ZoneId.of(zone))); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test"); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record, sinkConfig))).thenReturn(localFileWriter1); diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorageTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorageTest.java index 411089ffa..87bbb725d 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorageTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorageTest.java @@ -5,6 +5,7 @@ import com.gotocompany.firehose.config.BlobSinkConfig; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; import com.gotocompany.firehose.sink.blob.Constants; +import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy; import org.junit.Test; import org.mockito.Mockito; @@ -20,8 +21,9 @@ public void shouldDeleteFiles() throws Exception { BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class); List metadataFieldDescriptor = new ArrayList<>(); List policies = new ArrayList<>(); + List globalWriterPolicies = new ArrayList<>(); FirehoseInstrumentation firehoseInstrumentation = Mockito.mock(FirehoseInstrumentation.class); - LocalStorage storage = new LocalStorage(sinkConfig, null, metadataFieldDescriptor, policies, firehoseInstrumentation); + LocalStorage storage = new LocalStorage(sinkConfig, null, metadataFieldDescriptor, policies, globalWriterPolicies, firehoseInstrumentation); LocalStorage spy = Mockito.spy(storage); Mockito.doNothing().when(spy).deleteLocalFile(Paths.get("/tmp/a"), Paths.get("/tmp/.a.crc")); Mockito.when(sinkConfig.getLocalFileWriterType()).thenReturn(Constants.WriterType.PARQUET); diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/TimePartitionedPathUtilsTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/TimePartitionedPathUtilsTest.java index b67359b4c..8771123e3 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/TimePartitionedPathUtilsTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/TimePartitionedPathUtilsTest.java @@ -2,6 +2,7 @@ import com.google.protobuf.DynamicMessage; import com.gotocompany.firehose.config.BlobSinkConfig; +import com.gotocompany.firehose.config.enums.TimePartitionType; import com.gotocompany.firehose.sink.blob.Constants; import com.gotocompany.firehose.sink.blob.TestProtoMessage; import com.gotocompany.firehose.sink.blob.TestUtils; @@ -41,6 +42,7 @@ public void shouldCreateDayPartitioningPath() { Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY); + Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn(kafkaMetadataFieldName); Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn("date="); Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn(""); @@ -58,6 +60,7 @@ public void shouldCreateHourPartitioningPath() { Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn(kafkaMetadataFieldName); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); + Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.HOUR); Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix); Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn(hourPrefix); @@ -91,6 +94,7 @@ public void shouldCreatePartitionPathWhenKafkaMetadataIsNotNested() { BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class); Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY); + Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix); Mockito.when(sinkConfig.getFilePartitionTimeHourPrefix()).thenReturn(hourPrefix); @@ -107,6 +111,7 @@ public void shouldCreatePartitioningPathForNestedKafkaMetadata() { Record record = new Record(message, metadata); BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class); Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); + Mockito.when(sinkConfig.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.DAY); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); Mockito.when(sinkConfig.getFilePartitionTimeDatePrefix()).thenReturn(datePrefix);