Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.gotocompany.firehose.config.converter.BlobSinkFilePartitionTypeConverter;
import com.gotocompany.firehose.config.converter.BlobSinkLocalFileWriterTypeConverter;
import com.gotocompany.firehose.config.converter.BlobStorageTypeConverter;
import com.gotocompany.firehose.config.enums.TimePartitionType;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageType;
import com.gotocompany.firehose.sink.blob.Constants;

Expand Down Expand Up @@ -49,6 +50,15 @@ public interface BlobSinkConfig extends AppConfig {
@Key("SINK_BLOB_FILE_PARTITION_PROTO_TIMESTAMP_FIELD_NAME")
String getFilePartitionProtoTimestampFieldName();

@Key("SINK_BLOB_GLOBAL_FILE_ROTATION_MAX_SIZE_BYTES")
@DefaultValue("268435456")
long getGlobalFileRotationMaxSizeBytes();


@Key("SINK_BLOB_FILE_PARTITION_TIME_TYPE")
@DefaultValue("EVENT_TIMESTAMP")
TimePartitionType getFilePartitionTimeType();

@Key("SINK_BLOB_FILE_PARTITION_TIME_GRANULARITY_TYPE")
@DefaultValue("day")
@ConverterClass(BlobSinkFilePartitionTypeConverter.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.gotocompany.firehose.config.enums;

public enum TimePartitionType {
MESSAGE_TIMESTAMP,
PROCESSING_TIMESTAMP,
EVENT_TIMESTAMP

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.gotocompany.firehose.sink.blob.message.MessageDeSerializer;
import com.gotocompany.firehose.sink.blob.writer.WriterOrchestrator;
import com.gotocompany.firehose.sink.blob.writer.local.LocalStorage;
import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy;
import com.gotocompany.firehose.sink.blob.writer.local.policy.SizeBasedRotatingPolicy;
import com.gotocompany.firehose.sink.blob.writer.local.policy.TimeBasedRotatingPolicy;
import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy;
Expand Down Expand Up @@ -53,13 +54,16 @@ private static LocalStorage getLocalFileWriterWrapper(BlobSinkConfig sinkConfig,
Descriptors.Descriptor outputMessageDescriptor = stencilClient.get(sinkConfig.getInputSchemaProtoClass());
Descriptors.Descriptor metadataMessageDescriptor = getMetadataMessageDescriptor(sinkConfig);
List<WriterPolicy> writerPolicies = new ArrayList<>();
List<GlobalWriterPolicy> globalWriterPolicies = new ArrayList<>();
writerPolicies.add(new TimeBasedRotatingPolicy(sinkConfig.getLocalFileRotationDurationMS()));
writerPolicies.add(new SizeBasedRotatingPolicy(sinkConfig.getLocalFileRotationMaxSizeBytes()));
globalWriterPolicies.add(new SizeBasedRotatingPolicy(sinkConfig.getGlobalFileRotationMaxSizeBytes()));
return new LocalStorage(
sinkConfig,
outputMessageDescriptor,
metadataMessageDescriptor.getFields(),
writerPolicies,
globalWriterPolicies,
new FirehoseInstrumentation(statsDReporter, LocalStorage.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.gotocompany.firehose.config.BlobSinkConfig;
import com.gotocompany.firehose.sink.blob.proto.KafkaMetadataProtoMessage;
import lombok.AllArgsConstructor;
import lombok.Data;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;

@AllArgsConstructor
@Data
Expand All @@ -26,12 +29,47 @@ public String getTopic(String fieldName) {
return (String) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TOPIC_FIELD_NAME));
}

public Instant getTimestamp(String fieldName) {
Descriptors.Descriptor descriptor = message.getDescriptorForType();
public Instant getMessageTimeStamp(String metadataColumnName) {
Descriptors.Descriptor metadataDescriptor = metadata.getDescriptorForType();
com.google.protobuf.Timestamp timestamp;
if (!metadataColumnName.isEmpty()) {
DynamicMessage nestedMetadataMessage = (DynamicMessage) metadata.getField(metadataDescriptor.findFieldByName(metadataColumnName));
Descriptors.Descriptor nestedMetadataMessageDescriptor = nestedMetadataMessage.getDescriptorForType();
timestamp = (com.google.protobuf.Timestamp) nestedMetadataMessage.getField(nestedMetadataMessageDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME));
} else {
timestamp = (com.google.protobuf.Timestamp) metadata.getField(metadataDescriptor.findFieldByName(KafkaMetadataProtoMessage.MESSAGE_TIMESTAMP_FIELD_NAME));
}
long seconds = (long) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("seconds"));
int nanos = (int) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("nanos"));
return Instant.ofEpochSecond(seconds, nanos);
}

public Instant getTimestampFromMessage(String fieldName) {
return getTimeStampFromDescriptor(fieldName, message);
}

public Instant getTimeStampFromDescriptor(String fieldName, DynamicMessage m) {
Descriptors.Descriptor descriptor = m.getDescriptorForType();
Descriptors.FieldDescriptor timestampField = descriptor.findFieldByName(fieldName);
DynamicMessage timestamp = (DynamicMessage) message.getField(timestampField);
DynamicMessage timestamp = (DynamicMessage) m.getField(timestampField);
long seconds = (long) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("seconds"));
int nanos = (int) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("nanos"));
return Instant.ofEpochSecond(seconds, nanos);
}

public LocalDateTime getLocalDateTime(BlobSinkConfig config) {
switch (config.getFilePartitionTimeType()) {
case MESSAGE_TIMESTAMP:
return LocalDateTime.ofInstant(
getMessageTimeStamp(config.getOutputKafkaMetadataColumnName()),
ZoneId.of("UTC"));
case PROCESSING_TIMESTAMP:
return LocalDateTime.now();
default:
return LocalDateTime.ofInstant(
getTimestampFromMessage(config.getFilePartitionProtoTimestampFieldName()),
ZoneId.of(config.getFilePartitionProtoTimestampTimezone()));

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,16 @@ public LocalFileChecker(Queue<LocalFileMetadata> toBeFlushedToRemotePaths,
@Override
public void run() {
firehoseInstrumentation.captureValue(BlobStorageMetrics.LOCAL_FILE_OPEN_TOTAL, timePartitionWriterMap.size());
Map<Path, LocalFileWriter> toBeRotated =
timePartitionWriterMap.entrySet().stream().filter(kv -> localStorage.shouldRotate(kv.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<Path, LocalFileWriter> toBeRotated;
if (localStorage.shouldRotate(timePartitionWriterMap.values())) {
// rotate all
toBeRotated = timePartitionWriterMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} else {
toBeRotated =
timePartitionWriterMap.entrySet().stream().filter(kv -> localStorage.shouldRotate(kv.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
timePartitionWriterMap.entrySet().removeAll(toBeRotated.entrySet());
toBeRotated.forEach((path, writer) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
import com.gotocompany.firehose.config.BlobSinkConfig;
import com.gotocompany.firehose.exception.ConfigurationException;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy;
import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy;
import lombok.AllArgsConstructor;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

@AllArgsConstructor
public class LocalStorage {
Expand All @@ -21,6 +24,7 @@ public class LocalStorage {
private final Descriptors.Descriptor messageDescriptor;
private final List<Descriptors.FieldDescriptor> metadataFieldDescriptor;
private final List<WriterPolicy> policies;
private final List<GlobalWriterPolicy> globalPolicies;
private final FirehoseInstrumentation firehoseInstrumentation;

public LocalFileWriter createLocalFileWriter(Path partitionPath) {
Expand All @@ -31,7 +35,7 @@ public LocalFileWriter createLocalFileWriter(Path partitionPath) {
return createWriter(basePath, fullPath);
}

private LocalParquetFileWriter createWriter(Path basePath, Path fullPath) {
private LocalFileWriter createWriter(Path basePath, Path fullPath) {
switch (sinkConfig.getLocalFileWriterType()) {
case PARQUET:
try {
Expand Down Expand Up @@ -78,4 +82,10 @@ public void deleteLocalFile(Path... paths) throws IOException {
public Boolean shouldRotate(LocalFileWriter writer) {
return policies.stream().anyMatch(writerPolicy -> writerPolicy.shouldRotate(writer.getMetadata()));
}

public Boolean shouldRotate(Collection<LocalFileWriter> writers) {
return globalPolicies.stream().anyMatch(policy -> policy.shouldRotate(
writers.stream().map(LocalFileWriter::getMetadata).collect(Collectors.toList())
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@

import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

/**
Expand All @@ -25,22 +21,21 @@ public class TimePartitionedPathUtils {

public static Path getTimePartitionedPath(Record record, BlobSinkConfig sinkConfig) {
String topic = record.getTopic(sinkConfig.getOutputKafkaMetadataColumnName());
Instant timestamp = record.getTimestamp(sinkConfig.getFilePartitionProtoTimestampFieldName());
Path path = Paths.get(topic);
if (sinkConfig.getFilePartitionTimeGranularityType() == Constants.FilePartitionType.NONE) {
return Paths.get(topic);
return path;
}
LocalDate localDate = LocalDateTime.ofInstant(timestamp, ZoneId.of(sinkConfig.getFilePartitionProtoTimestampTimezone())).toLocalDate();
String datePart = DATE_FORMATTER.format(localDate);
LocalTime localTime = LocalDateTime.ofInstant(timestamp, ZoneId.of(sinkConfig.getFilePartitionProtoTimestampTimezone())).toLocalTime();
String hourPart = HOUR_FORMATTER.format(localTime);
LocalDateTime dateTime = record.getLocalDateTime(sinkConfig);
String datePart = DATE_FORMATTER.format(dateTime.toLocalDate());
String hourPart = HOUR_FORMATTER.format(dateTime.toLocalTime());

String dateSegment = String.format("%s%s", sinkConfig.getFilePartitionTimeDatePrefix(), datePart);
String hourSegment = String.format("%s%s", sinkConfig.getFilePartitionTimeHourPrefix(), hourPart);

String dateTimePartition;
switch (sinkConfig.getFilePartitionTimeGranularityType()) {
case NONE:
return Paths.get(topic);
return path;
case DAY:
dateTimePartition = String.format("%s", dateSegment);
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.gotocompany.firehose.sink.blob.writer.local.policy;

import com.gotocompany.firehose.sink.blob.writer.local.LocalFileMetadata;

import java.util.List;

public interface GlobalWriterPolicy {

boolean shouldRotate(List<LocalFileMetadata> metadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.gotocompany.firehose.sink.blob.writer.local.LocalFileMetadata;

public class SizeBasedRotatingPolicy implements WriterPolicy {
import java.util.List;

public class SizeBasedRotatingPolicy implements WriterPolicy, GlobalWriterPolicy {

private final long maxSize;

Expand All @@ -17,4 +19,10 @@ public SizeBasedRotatingPolicy(long maxSize) {
public boolean shouldRotate(LocalFileMetadata metadata) {
return metadata.getSize() >= maxSize;
}

@Override
public boolean shouldRotate(List<LocalFileMetadata> metadataList) {
long totalSize = metadataList.stream().map(LocalFileMetadata::getSize).reduce(0L, Long::sum);
return totalSize >= maxSize;
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package com.gotocompany.firehose.sink.blob.message;

import com.google.protobuf.DynamicMessage;
import com.gotocompany.firehose.config.BlobSinkConfig;
import com.gotocompany.firehose.config.enums.TimePartitionType;
import com.gotocompany.firehose.sink.blob.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;

public class RecordTest {

private final Instant defaultTimestamp = Instant.parse("2020-01-01T10:00:00.000Z");
private final Instant messageTimeStamp = Instant.parse("2020-01-02T10:00:00.000Z");
private final int defaultOrderNumber = 100;
private final long defaultOffset = 1L;
private final int defaultPartition = 1;
Expand Down Expand Up @@ -37,6 +43,48 @@ public void shouldGetTimeStampFromMessage() {
DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber);
DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic);
Record record = new Record(message, metadata);
Assert.assertEquals(defaultTimestamp, record.getTimestamp("created_time"));
Assert.assertEquals(defaultTimestamp, record.getTimestampFromMessage("created_time"));
}

@Test
public void shouldGetDateTimeLocally() throws InterruptedException {
BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class);
Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.PROCESSING_TIMESTAMP);
DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber);
DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic);
Record record = new Record(message, metadata);
LocalDateTime before = LocalDateTime.now();
Thread.sleep(1000);
LocalDateTime localDateTime = record.getLocalDateTime(config);
Thread.sleep(1000);
LocalDateTime after = LocalDateTime.now();
Assert.assertTrue(localDateTime.isAfter(before));
Assert.assertTrue(localDateTime.isBefore(after));
}

@Test
public void shouldGetDateTimeFromMessage() throws InterruptedException {
BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class);
Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.EVENT_TIMESTAMP);
Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time");
Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC");
DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber);
DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic);
Record record = new Record(message, metadata);
LocalDateTime localDateTime = record.getLocalDateTime(config);
Assert.assertEquals(LocalDateTime.ofInstant(defaultTimestamp, ZoneId.of("UTC")), localDateTime);
}
@Test
public void shouldGetDateTimeFromKafkaMessage() throws InterruptedException {
BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class);
Mockito.when(config.getFilePartitionTimeType()).thenReturn(TimePartitionType.MESSAGE_TIMESTAMP);
Mockito.when(config.getOutputKafkaMetadataColumnName()).thenReturn("nested_field");
Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time");
Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC");
DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber);
DynamicMessage metadata = TestUtils.createMetadata("nested_field", messageTimeStamp, defaultOffset, defaultPartition, defaultTopic);
Record record = new Record(message, metadata);
LocalDateTime localDateTime = record.getLocalDateTime(config);
Assert.assertEquals(LocalDateTime.ofInstant(messageTimeStamp, ZoneId.of("UTC")), localDateTime);
}
}
Loading