diff --git a/env/local.properties b/env/local.properties index 1908b7b76..805d1ee47 100644 --- a/env/local.properties +++ b/env/local.properties @@ -19,6 +19,7 @@ INPUT_SCHEMA_PROTO_CLASS=com.gotocompany.firehose.consumer.TestMessage # APPLICATION_THREAD_COUNT=2 # TRACE_JAEGAR_ENABLE=true # LOG_LEVEL=info +# INPUT_SCHEMA_DATA_TYPE=protobuf # # ############################################# @@ -122,7 +123,6 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id # SINK_HTTP_OAUTH2_CLIENT_SECRET=client-secret # SINK_HTTP_OAUTH2_SCOPE=User:read, sys:info # -# ############################################# # ## INFLUX SINK diff --git a/src/main/java/com/gotocompany/firehose/consumer/kafka/FirehoseKafkaConsumer.java b/src/main/java/com/gotocompany/firehose/consumer/kafka/FirehoseKafkaConsumer.java index ec3763e01..ce103fcfa 100644 --- a/src/main/java/com/gotocompany/firehose/consumer/kafka/FirehoseKafkaConsumer.java +++ b/src/main/java/com/gotocompany/firehose/consumer/kafka/FirehoseKafkaConsumer.java @@ -57,9 +57,12 @@ public List readMessages() { List messages = new ArrayList<>(); for (ConsumerRecord record : records) { - messages.add(new Message(record.key(), record.value(), record.topic(), record.partition(), record.offset(), record.headers(), record.timestamp(), System.currentTimeMillis())); + Message msg = new Message(record.key(), record.value(), record.topic(), record.partition(), record.offset(), record.headers(), record.timestamp(), System.currentTimeMillis()); + msg.setInputSchemaType(consumerConfig.getInputSchemaType()); + messages.add(msg); firehoseInstrumentation.logDebug("Pulled record: {}", record); } + return messages; } diff --git a/src/main/java/com/gotocompany/firehose/message/Message.java b/src/main/java/com/gotocompany/firehose/message/Message.java index 0ba03f14f..04623e85b 100644 --- a/src/main/java/com/gotocompany/firehose/message/Message.java +++ b/src/main/java/com/gotocompany/firehose/message/Message.java @@ -1,6 +1,7 @@ package com.gotocompany.firehose.message; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.exception.DefaultException; import com.gotocompany.depot.error.ErrorInfo; import com.gotocompany.depot.error.ErrorType; @@ -29,6 +30,8 @@ public class Message { private long consumeTimestamp; @Setter private ErrorInfo errorInfo; + @Setter + private InputSchemaType inputSchemaType; public void setDefaultErrorIfNotPresent() { if (errorInfo == null) { @@ -54,7 +57,7 @@ public Message(byte[] logKey, byte[] logMessage, String topic, int partition, lo } /** - * Instantiates a new Message without providing errorType. + * Instantiates a new Message without providing errorType and inputSchemaType. * * @param logKey * @param logMessage @@ -66,7 +69,7 @@ public Message(byte[] logKey, byte[] logMessage, String topic, int partition, lo * @param consumeTimestamp */ public Message(byte[] logKey, byte[] logMessage, String topic, int partition, long offset, Headers headers, long timestamp, long consumeTimestamp) { - this(logKey, logMessage, topic, partition, offset, headers, timestamp, consumeTimestamp, null); + this(logKey, logMessage, topic, partition, offset, headers, timestamp, consumeTimestamp, null, InputSchemaType.PROTOBUF); } public Message(Message message, ErrorInfo errorInfo) { @@ -78,7 +81,23 @@ public Message(Message message, ErrorInfo errorInfo) { message.getHeaders(), message.getTimestamp(), message.getConsumeTimestamp(), - errorInfo); + errorInfo, + message.getInputSchemaType() + ); + } + + public Message(Message message, ErrorInfo errorInfo, InputSchemaType inputSchemaType) { + this(message.getLogKey(), + message.getLogMessage(), + message.getTopic(), + message.getPartition(), + message.getOffset(), + message.getHeaders(), + message.getTimestamp(), + message.getConsumeTimestamp(), + errorInfo, + inputSchemaType + ); } /** diff --git a/src/main/java/com/gotocompany/firehose/serializer/MessageToJson.java b/src/main/java/com/gotocompany/firehose/serializer/MessageToJson.java index 69daeaa38..8131f4b13 100644 --- a/src/main/java/com/gotocompany/firehose/serializer/MessageToJson.java +++ b/src/main/java/com/gotocompany/firehose/serializer/MessageToJson.java @@ -1,6 +1,7 @@ package com.gotocompany.firehose.serializer; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.exception.DeserializerException; import com.google.gson.ExclusionStrategy; @@ -17,6 +18,7 @@ import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; +import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.Collections; import java.util.HashMap; @@ -55,6 +57,17 @@ public String serialize(Message message) throws DeserializerException { JSONObject jsonObject = new JSONObject(); jsonObject.put("topic", message.getTopic()); + if (message.getInputSchemaType() == InputSchemaType.JSON) { + JSONParser parser = new JSONParser(); + JSONObject json = (JSONObject) parser.parse(new String(message.getLogMessage(), StandardCharsets.UTF_8)); + jsonObject.put("logMessage", gson.toJson(json)); + if (message.getLogKey() != null && message.getLogKey().length != 0) { + jsonObject.put("logKey", new String(message.getLogKey(), StandardCharsets.UTF_8)); + } + + return jsonObject.toJSONString(); + } + if (message.getLogKey() != null && message.getLogKey().length != 0) { DynamicMessage key = protoParser.parse(message.getLogKey()); jsonObject.put("logKey", this.gson.toJson(convertDynamicMessageToJson(key))); diff --git a/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java b/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java index ee42fb0e5..dbef17c88 100644 --- a/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java +++ b/src/main/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJson.java @@ -1,6 +1,7 @@ package com.gotocompany.firehose.serializer; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.exception.DeserializerException; import com.gotocompany.firehose.exception.ConfigurationException; @@ -12,9 +13,11 @@ import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; import com.gotocompany.stencil.Parser; +import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -77,9 +80,18 @@ public String serialize(Message message) throws DeserializerException { try { String jsonMessage; String jsonString; - // only supports messages not keys - DynamicMessage msg = protoParser.parse(message.getLogMessage()); - jsonMessage = JsonFormat.printer().includingDefaultValueFields().preservingProtoFieldNames().print(msg); + + if (message.getInputSchemaType() == InputSchemaType.JSON) { + System.out.println(new String(message.getLogMessage())); + JSONObject json = (JSONObject) jsonParser.parse(new String(message.getLogMessage(), StandardCharsets.UTF_8)); + System.out.println(json.toJSONString()); + jsonMessage = json.toJSONString(); + } else { + // only supports messages not keys + DynamicMessage msg = protoParser.parse(message.getLogMessage()); + jsonMessage = JsonFormat.printer().includingDefaultValueFields().preservingProtoFieldNames().print(msg); + } + String finalMessage = httpSinkJsonBodyTemplate; for (String path : pathsToReplace) { if (path.equals(ALL_FIELDS_FROM_TEMPLATE)) { @@ -91,7 +103,8 @@ public String serialize(Message message) throws DeserializerException { finalMessage = finalMessage.replace(path, jsonString); } return finalMessage; - } catch (InvalidProtocolBufferException | PathNotFoundException e) { + } catch (InvalidProtocolBufferException | ParseException | PathNotFoundException e) { + e.printStackTrace(); throw new DeserializerException(e.getMessage()); } } diff --git a/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java b/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java index 923d08471..7f5331d1f 100644 --- a/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/http/factory/SerializerFactory.java @@ -2,6 +2,7 @@ import com.gotocompany.firehose.config.HttpSinkConfig; import com.gotocompany.firehose.config.enums.HttpSinkDataFormatType; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; import com.gotocompany.firehose.serializer.JsonWrappedProtoByte; import com.gotocompany.firehose.serializer.MessageSerializer; @@ -24,12 +25,16 @@ public class SerializerFactory { public MessageSerializer build() { FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, SerializerFactory.class); - if (isProtoSchemaEmpty() || httpSinkConfig.getSinkHttpDataFormat() == HttpSinkDataFormatType.PROTO) { + if ( (httpSinkConfig.getInputSchemaType() == InputSchemaType.PROTOBUF && isProtoSchemaEmpty()) || httpSinkConfig.getSinkHttpDataFormat() == HttpSinkDataFormatType.PROTO) { firehoseInstrumentation.logDebug("Serializer type: JsonWrappedProtoByte"); // Fallback to json wrapped proto byte + + // todo(sushmith): + // here output is proto, but input expected is also proto. + // need to have json as input and proto as output. + // this is currently not possible because of the way we are using the parser. return new JsonWrappedProtoByte(); } - if (httpSinkConfig.getSinkHttpDataFormat() == HttpSinkDataFormatType.JSON) { Parser protoParser = stencilClient.getParser(httpSinkConfig.getInputSchemaProtoClass()); if (httpSinkConfig.getSinkHttpJsonBodyTemplate().isEmpty()) { diff --git a/src/main/java/com/gotocompany/firehose/sink/http/request/uri/UriParser.java b/src/main/java/com/gotocompany/firehose/sink/http/request/uri/UriParser.java index c957d12e9..0da069510 100644 --- a/src/main/java/com/gotocompany/firehose/sink/http/request/uri/UriParser.java +++ b/src/main/java/com/gotocompany/firehose/sink/http/request/uri/UriParser.java @@ -1,6 +1,9 @@ package com.gotocompany.firehose.sink.http.request.uri; +import com.google.gson.JsonObject; +import com.gotocompany.firehose.config.enums.InputSchemaType; +import com.gotocompany.firehose.exception.JsonParseException; import com.gotocompany.firehose.message.Message; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; @@ -8,7 +11,11 @@ import com.gotocompany.stencil.Parser; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; @@ -18,16 +25,23 @@ public class UriParser { private Parser protoParser; private String parserMode; + private JSONParser jsonParser; + public UriParser(Parser protoParser, String parserMode) { this.protoParser = protoParser; this.parserMode = parserMode; + this.jsonParser = new JSONParser(); } - public String parse(Message message, String serviceUrl) { - DynamicMessage parsedMessage = parseEsbMessage(message); - return parseServiceUrl(parsedMessage, serviceUrl); + public UriParser(Parser protoParser, String parserMode, JSONParser jsonParser) { + this.protoParser = protoParser; + this.parserMode = parserMode; + this.jsonParser = jsonParser; + } + public String parse(Message message, String serviceUrl) { + return parseServiceUrl(message, serviceUrl); } private DynamicMessage parseEsbMessage(Message message) { @@ -40,7 +54,18 @@ private DynamicMessage parseEsbMessage(Message message) { return parsedMessage; } - private String parseServiceUrl(DynamicMessage data, String serviceUrl) { + private JSONObject parseJsonMessage(Message message) { + JSONObject jsonObject; + try { + jsonObject = (JSONObject) jsonParser.parse(new String(message.getLogMessage(), StandardCharsets.UTF_8)); + } catch (ParseException e) { + throw new JsonParseException(e.getMessage(), e.getCause()); + } + return jsonObject; + } + + + private String parseServiceUrl(Message message, String serviceUrl) { if (StringUtils.isEmpty(serviceUrl)) { throw new IllegalArgumentException("Service URL '" + serviceUrl + "' is invalid"); } @@ -55,16 +80,43 @@ private String parseServiceUrl(DynamicMessage data, String serviceUrl) { String urlPattern = urlStrings[0]; String urlVariables = StringUtils.join(Arrays.copyOfRange(urlStrings, 1, urlStrings.length), ","); - String renderedUrl = renderStringUrl(data, urlPattern, urlVariables); - return StringUtils.isEmpty(urlVariables) - ? urlPattern - : renderedUrl; + + if (StringUtils.isEmpty(urlVariables)) { + return urlPattern; + } + + String renderedUrl; + if (message.getInputSchemaType() == InputSchemaType.JSON) { + JSONObject json = parseJsonMessage(message); + renderedUrl = renderStringUrl(json, urlPattern, urlVariables); + } else { + // InputSchemaType.PROTOBUF + DynamicMessage data = parseEsbMessage(message); + renderedUrl = renderStringUrl(data, urlPattern, urlVariables); + } + + return renderedUrl; } - private String renderStringUrl(DynamicMessage parsedMessage, String pattern, String patternVariables) { - if (StringUtils.isEmpty(patternVariables)) { - return pattern; + private String renderStringUrl(JSONObject jsonObject, String pattern, String patternVariables) { + List patternVariablesList = Arrays.asList(patternVariables.split(",")); + Object[] patternVariableData = patternVariablesList + .stream() + .map(field -> getDataByFieldName(jsonObject, field)) + .toArray(); + return String.format(pattern, patternVariableData); + } + + private Object getDataByFieldName(JSONObject jsonObject, String fieldName) { + if (!jsonObject.containsKey(fieldName)) { + throw new IllegalArgumentException("Invalid json field name: " + fieldName); } + + return jsonObject.get(fieldName); + } + + + private String renderStringUrl(DynamicMessage parsedMessage, String pattern, String patternVariables) { List patternVariableFieldNumbers = Arrays.asList(patternVariables.split(",")); Object[] patternVariableData = patternVariableFieldNumbers .stream() diff --git a/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java b/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java index 4d03e9cb3..517dd375e 100644 --- a/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java +++ b/src/test/java/com/gotocompany/firehose/serializer/MessageToJsonTest.java @@ -1,5 +1,6 @@ package com.gotocompany.firehose.serializer; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.exception.DeserializerException; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.consumer.TestAggregatedSupplyMessage; @@ -9,6 +10,7 @@ import org.junit.Before; import org.junit.Test; +import java.nio.charset.StandardCharsets; import java.util.Base64; import static org.junit.Assert.assertEquals; @@ -16,6 +18,8 @@ public class MessageToJsonTest { private String logMessage; private String logKey; + private String logMessageJSONString; + private String logKeyJSONString; private Parser protoParser; @Before @@ -24,6 +28,20 @@ public void setUp() { protoParser = stencilClient.getParser(TestAggregatedSupplyMessage.class.getName()); logMessage = "CgYIyOm+xgUSBgiE6r7GBRgNIICAgIDA9/y0LigCMAM\u003d"; logKey = "CgYIyOm+xgUSBgiE6r7GBRgNIICAgIDA9/y0LigC"; + + logMessageJSONString = "{\n" + + " \"uniqueDrivers\": \"3\",\n" + + " \"windowStartTime\": \"Mar 20, 2017 10:54:00 AM\",\n" + + " \"windowEndTime\": \"Mar 20, 2017 10:55:00 AM\",\n" + + " \"s2IdLevel\": 13,\n" + + " \"vehicleType\": \"BIKE\",\n" + + " \"s2Id\": \"3344472187078705152\"\n" + + " }"; + logKeyJSONString = "sample-key1"; + } + + public byte[] stringToByteArray(String inputString) { + return StandardCharsets.UTF_8.encode(inputString).array(); } @Test @@ -42,6 +60,23 @@ public void shouldProperlySerializeEsbMessage() throws DeserializerException { + "\\\"s2Id\\\":\\\"3344472187078705152\\\"}\"}"); } + + @Test + public void shouldProperlySerializeJsonInputMessage() throws DeserializerException { + MessageToJson messageToJson = new MessageToJson(protoParser, false, true); + Message message = new Message(logKeyJSONString.getBytes(), logMessageJSONString.getBytes(), "sample-topic", 0, 100); + message.setInputSchemaType(InputSchemaType.JSON); + + String expectedOutput = "{\"logMessage\":\"{\\\"uniqueDrivers\\\":\\\"3\\\"," + + "\\\"windowStartTime\\\":\\\"Mar 20, 2017 10:54:00 AM\\\"," + + "\\\"windowEndTime\\\":\\\"Mar 20, 2017 10:55:00 AM\\\"," + + "\\\"s2IdLevel\\\":13,\\\"vehicleType\\\":\\\"BIKE\\\",\\\"s2Id\\\":\\\"3344472187078705152\\\"}\"," + + "\"topic\":\"sample-topic\",\"logKey\":\"sample-key1\"}"; + + String actualOutput = messageToJson.serialize(message); + assertEquals(expectedOutput, actualOutput); + } + @Test public void shouldSerializeWhenKeyIsMissing() throws DeserializerException { MessageToJson messageToJson = new MessageToJson(protoParser, false, true); @@ -55,6 +90,21 @@ public void shouldSerializeWhenKeyIsMissing() throws DeserializerException { + "\\\"s2Id\\\":\\\"3344472187078705152\\\"}\",\"topic\":\"sample-topic\"}", actualOutput); } + @Test + public void shouldSerializeJSONInputMessageWhenKeyIsMissing() throws DeserializerException { + MessageToJson messageToJson = new MessageToJson(protoParser, false, true); + Message message = new Message(null, logMessageJSONString.getBytes(), "sample-topic", 0, 100); + message.setInputSchemaType(InputSchemaType.JSON); + + String actualOutput = messageToJson.serialize(message); + assertEquals("{\"logMessage\":\"{\\\"uniqueDrivers\\\":\\\"3\\\"," + + "\\\"windowStartTime\\\":\\\"Mar 20, 2017 10:54:00 AM\\\"," + + "\\\"windowEndTime\\\":\\\"Mar 20, 2017 10:55:00 AM\\\",\\\"s2IdLevel\\\":13,\\\"vehicleType\\\":\\\"BIKE\\\"," + + "\\\"s2Id\\\":\\\"3344472187078705152\\\"}\",\"topic\":\"sample-topic\"}", actualOutput); + + + } + @Test public void shouldSerializeWhenKeyIsEmptyWithTimestampsAsSimpleDateFormatWhenFlagIsEnabled() throws DeserializerException { MessageToJson messageToJson = new MessageToJson(protoParser, false, true); @@ -113,3 +163,5 @@ public void shouldReturnTheTimestampFieldsInISOFormatIfSimpleDateFormatIsDisable + "\\\"s2Id\\\":\\\"3344472187078705152\\\"}\",\"topic\":\"sample-topic\"}]", actualOutput); } } + + diff --git a/src/test/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJsonTest.java b/src/test/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJsonTest.java index 5aaedd5a2..484164eb3 100644 --- a/src/test/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJsonTest.java +++ b/src/test/java/com/gotocompany/firehose/serializer/MessageToTemplatizedJsonTest.java @@ -1,8 +1,7 @@ package com.gotocompany.firehose.serializer; - - +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.exception.ConfigurationException; import com.gotocompany.firehose.exception.DeserializerException; import com.gotocompany.firehose.message.Message; @@ -41,12 +40,23 @@ public class MessageToTemplatizedJsonTest { private String logMessage; private String logKey; + private String logMessageJSONString; + private String logKeyJSONString; @Before public void setup() { initMocks(this); logMessage = "CgYIyOm+xgUSBgiE6r7GBRgNIICAgIDA9/y0LigCMAM\u003d"; logKey = "CgYIyOm+xgUSBgiE6r7GBRgNIICAgIDA9/y0LigC"; + logMessageJSONString = "{\n" + + " \"uniqueDrivers\": \"3\",\n" + + " \"windowStartTime\": \"Mar 20, 2017 10:54:00 AM\",\n" + + " \"windowEndTime\": \"Mar 20, 2017 10:55:00 AM\",\n" + + " \"s2IdLevel\": 13,\n" + + " \"vehicleType\": \"BIKE\",\n" + + " \"s2Id\": \"3344472187078705152\"\n" + + " }"; + logKeyJSONString = "sample-key1"; } @Test @@ -64,6 +74,23 @@ public void shouldProperlySerializeMessageToTemplateWithSingleUnknownField() { Assert.assertEquals(expectedMessage, serializedMessage); } + @Test + public void shouldProperlySerializeJsonInputMessageToTemplateWithSingleKnownField() throws DeserializerException { + String template = "{\"test\":\"$.vehicleType\"}"; + StencilClient stencilClient = StencilClientFactory.getClient(); + protoParser = stencilClient.getParser(TestAggregatedSupplyMessage.class.getName()); + MessageToTemplatizedJson messageToTemplatizedJson = MessageToTemplatizedJson + .create(firehoseInstrumentation, template, protoParser); + Message message = new Message(logKeyJSONString.getBytes(), logMessageJSONString.getBytes(), "sample-topic", 0, 100); + message.setInputSchemaType(InputSchemaType.JSON); + + String expectedMessage = "{\"test\":\"BIKE\"}"; + + String serializedMessage = messageToTemplatizedJson.serialize(message); + + Assert.assertEquals(expectedMessage, serializedMessage); + } + @Test public void shouldProperlySerializeMessageToTemplateWithAsItIs() { String template = "\"$._all_\""; @@ -86,6 +113,26 @@ public void shouldProperlySerializeMessageToTemplateWithAsItIs() { Assert.assertEquals(expectedMessage, serializedMessage); } + @Test + public void shouldProperlySerializeJsonInputMessageToTemplateAsItIs() throws DeserializerException { + String template = "\"$._all_\""; + StencilClient stencilClient = StencilClientFactory.getClient(); + protoParser = stencilClient.getParser(TestAggregatedSupplyMessage.class.getName()); + MessageToTemplatizedJson messageToTemplatizedJson = MessageToTemplatizedJson + .create(firehoseInstrumentation, template, protoParser); + Message message = new Message(logKeyJSONString.getBytes(), logMessageJSONString.getBytes(), "sample-topic", 0, 100); + message.setInputSchemaType(InputSchemaType.JSON); + + String expectedMessage = "{\"uniqueDrivers\":\"3\"," + + "\"windowStartTime\":\"Mar 20, 2017 10:54:00 AM\"," + + "\"windowEndTime\":\"Mar 20, 2017 10:55:00 AM\",\"s2IdLevel\":13," + + "\"vehicleType\":\"BIKE\",\"s2Id\":\"3344472187078705152\"}"; + + String serializedMessage = messageToTemplatizedJson.serialize(message); + + Assert.assertEquals(expectedMessage, serializedMessage); + } + @Test public void shouldThrowIfNoPathsFoundInTheProto() { expectedException.expect(DeserializerException.class); @@ -102,6 +149,22 @@ public void shouldThrowIfNoPathsFoundInTheProto() { messageToTemplatizedJson.serialize(message); } + @Test + public void shouldThrowIfNoPathsFoundInTheJSON() { + expectedException.expect(DeserializerException.class); + expectedException.expectMessage("No results for path: $['invalidPath']"); + + String template = "{\"test\":\"$.invalidPath\"}"; + StencilClient stencilClient = StencilClientFactory.getClient(); + protoParser = stencilClient.getParser(TestAggregatedSupplyMessage.class.getName()); + MessageToTemplatizedJson messageToTemplatizedJson = MessageToTemplatizedJson + .create(firehoseInstrumentation, template, protoParser); + Message message = new Message(logKeyJSONString.getBytes(), logMessageJSONString.getBytes(), "sample-topic", 0, 100); + message.setInputSchemaType(InputSchemaType.JSON); + + messageToTemplatizedJson.serialize(message); + } + @Test public void shouldFailForNonJsonTemplate() { expectedException.expect(ConfigurationException.class); diff --git a/src/test/java/com/gotocompany/firehose/sink/dlq/LogDlqWriterTest.java b/src/test/java/com/gotocompany/firehose/sink/dlq/LogDlqWriterTest.java index 5381b9162..03995c36d 100644 --- a/src/test/java/com/gotocompany/firehose/sink/dlq/LogDlqWriterTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/dlq/LogDlqWriterTest.java @@ -2,6 +2,7 @@ import com.gotocompany.depot.error.ErrorInfo; import com.gotocompany.depot.error.ErrorType; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; import com.gotocompany.firehose.sink.dlq.log.LogDlqWriter; @@ -35,7 +36,7 @@ public void setUp() throws Exception { @Test public void shouldWriteMessagesToLog() throws IOException { long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); String key = new String(message.getLogKey()); String value = new String(message.getLogMessage()); @@ -51,7 +52,7 @@ public void shouldWriteMessagesToLog() throws IOException { @Test public void shouldWriteMessagesToLogWhenKeyIsNull() throws IOException { long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message = new Message(null, "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + Message message = new Message(null, "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); String value = new String(message.getLogMessage()); ErrorInfo errorInfo = message.getErrorInfo(); @@ -66,7 +67,7 @@ public void shouldWriteMessagesToLogWhenKeyIsNull() throws IOException { @Test public void shouldWriteMessagesToLogWhenValueIsNull() throws IOException { long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message = new Message("123".getBytes(), null, "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + Message message = new Message("123".getBytes(), null, "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); String key = new String(message.getLogKey()); ErrorInfo errorInfo = message.getErrorInfo(); @@ -81,7 +82,7 @@ public void shouldWriteMessagesToLogWhenValueIsNull() throws IOException { @Test public void shouldWriteMessagesToLogWhenErrorInfoIsNull() throws IOException { long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, null); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, null, InputSchemaType.PROTOBUF); String key = new String(message.getLogKey()); String value = new String(message.getLogMessage()); @@ -95,7 +96,7 @@ public void shouldWriteMessagesToLogWhenErrorInfoIsNull() throws IOException { @Test public void shouldWriteMessagesToLogWhenErrorInfoExceptionIsNull() throws IOException { long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR)); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, timestamp, new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); String key = new String(message.getLogKey()); String value = new String(message.getLogMessage()); diff --git a/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java b/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java index c55d6eed2..1c8c256eb 100644 --- a/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java @@ -2,6 +2,7 @@ import com.gotocompany.depot.error.ErrorInfo; import com.gotocompany.depot.error.ErrorType; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.exception.DeserializerException; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.sink.common.blobstorage.BlobStorage; @@ -37,12 +38,12 @@ public void setUp() throws Exception { @Test public void shouldWriteMessagesWithoutErrorInfoToObjectStorage() throws IOException, BlobStorageException { long timestamp1 = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp1, timestamp1, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); - Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, timestamp1, timestamp1, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp1, timestamp1, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, timestamp1, timestamp1, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); long timestamp2 = Instant.parse("2020-01-02T00:00:00Z").toEpochMilli(); - Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, timestamp2, timestamp2, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); - Message message4 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 4, null, timestamp2, timestamp2, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, timestamp2, timestamp2, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message4 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 4, null, timestamp2, timestamp2, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); List messages = Arrays.asList(message1, message2, message3, message4); Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size()); @@ -58,12 +59,12 @@ public void shouldWriteMessagesWithoutErrorInfoToObjectStorage() throws IOExcept @Test public void shouldWriteMessageErrorTypesToObjectStorage() throws IOException, BlobStorageException { long timestamp1 = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp1, timestamp1, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR)); - Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, timestamp1, timestamp1, new ErrorInfo(new NullPointerException(), ErrorType.SINK_UNKNOWN_ERROR)); + Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp1, timestamp1, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, timestamp1, timestamp1, new ErrorInfo(new NullPointerException(), ErrorType.SINK_UNKNOWN_ERROR), InputSchemaType.PROTOBUF); long timestamp2 = Instant.parse("2020-01-02T00:00:00Z").toEpochMilli(); - Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR)); - Message message4 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 4, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.SINK_UNKNOWN_ERROR)); + Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message4 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 4, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.SINK_UNKNOWN_ERROR), InputSchemaType.PROTOBUF); List messages = Arrays.asList(message1, message2, message3, message4); Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size()); @@ -79,12 +80,12 @@ public void shouldWriteMessageErrorTypesToObjectStorage() throws IOException, Bl @Test public void shouldThrowIOExceptionWhenWriteFileThrowIOException() throws IOException, BlobStorageException { long timestamp1 = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp1, timestamp1, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR)); - Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, timestamp1, timestamp1, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR)); + Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp1, timestamp1, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, timestamp1, timestamp1, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); long timestamp2 = Instant.parse("2020-01-02T00:00:00Z").toEpochMilli(); - Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR)); - Message message4 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 4, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR)); + Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message4 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 4, null, timestamp2, timestamp2, new ErrorInfo(new DeserializerException(""), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); doThrow(new BlobStorageException("", "", new IOException())).when(blobStorage).store(anyString(), any(byte[].class)); diff --git a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithDlqTest.java b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithDlqTest.java index f6c0f8e4d..59a27e11b 100644 --- a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithDlqTest.java +++ b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithDlqTest.java @@ -2,6 +2,7 @@ import com.gotocompany.firehose.config.DlqConfig; import com.gotocompany.firehose.config.ErrorConfig; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.error.ErrorHandler; import com.gotocompany.firehose.message.Message; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; @@ -180,9 +181,9 @@ public void shouldNotThrowIOExceptionWhenFailOnMaxRetryAttemptDisabled() throws @Test public void shouldCommitOffsetsOfDlqMessagesWhenSinkManageOffset() throws IOException { long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); - Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, 0, timestamp, new ErrorInfo(new IOException(), ErrorType.DESERIALIZATION_ERROR)); - Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, 0, timestamp, new ErrorInfo(new IOException(), ErrorType.DESERIALIZATION_ERROR)); - Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, 0, timestamp, new ErrorInfo(new IOException(), ErrorType.DESERIALIZATION_ERROR)); + Message message1 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, 0, timestamp, new ErrorInfo(new IOException(), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message2 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 2, null, 0, timestamp, new ErrorInfo(new IOException(), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); + Message message3 = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 3, null, 0, timestamp, new ErrorInfo(new IOException(), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF); ArrayList messages = new ArrayList<>(); messages.add(message1); diff --git a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithFailHandlerTest.java b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithFailHandlerTest.java index 2f7fd3ef6..8530f5e0f 100644 --- a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithFailHandlerTest.java +++ b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithFailHandlerTest.java @@ -1,6 +1,7 @@ package com.gotocompany.firehose.sinkdecorator; import com.gotocompany.firehose.config.ErrorConfig; +import com.gotocompany.firehose.config.enums.InputSchemaType; import com.gotocompany.firehose.error.ErrorHandler; import com.gotocompany.firehose.exception.SinkException; import com.gotocompany.firehose.message.Message; @@ -37,7 +38,7 @@ public void shouldThrowIOExceptionWhenMessageContainsConfiguredError() throws IO List messages = new LinkedList<>(); messages.add(new Message("".getBytes(), "".getBytes(), "basic", 1, 1, null, 0, 0, - new ErrorInfo(new RuntimeException(), ErrorType.DESERIALIZATION_ERROR))); + new ErrorInfo(new RuntimeException(), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF)); when(sink.pushMessage(anyList())).thenReturn(messages); @@ -54,7 +55,7 @@ public void shouldNotThrowIOExceptionWhenConfigIsNotSet() throws IOException { List messages = new LinkedList<>(); messages.add(new Message("".getBytes(), "".getBytes(), "basic", 1, 1, null, 0, 0, - new ErrorInfo(new RuntimeException(), ErrorType.DESERIALIZATION_ERROR))); + new ErrorInfo(new RuntimeException(), ErrorType.DESERIALIZATION_ERROR), InputSchemaType.PROTOBUF)); when(sink.pushMessage(anyList())).thenReturn(messages); diff --git a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetryTest.java b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetryTest.java index 44d23f96c..37d9a42a1 100644 --- a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetryTest.java +++ b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetryTest.java @@ -231,11 +231,13 @@ public void shouldThrowIOExceptionWhenExceedMaximumRetryAttempts() throws IOExce @Test public void shouldRetryMessagesWhenErrorTypesConfigured() throws IOException { - Message messageWithError = new Message("key".getBytes(), "value".getBytes(), "topic", 1, 1, null, 0, 0, new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR)); + InputSchemaType inputSchemaType = InputSchemaType.PROTOBUF; + ErrorInfo errorInfo = new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR); + Message messageWithError = new Message("key".getBytes(), "value".getBytes(), "topic", 1, 1, null, 0, 0, errorInfo, inputSchemaType); ArrayList messages = new ArrayList<>(); messages.add(messageWithError); - messages.add(new Message(message, new ErrorInfo(null, ErrorType.SINK_UNKNOWN_ERROR))); + messages.add(new Message(message, new ErrorInfo(null, ErrorType.SINK_UNKNOWN_ERROR), inputSchemaType)); when(sinkDecorator.pushMessage(anyList())).thenReturn(messages).thenReturn(new LinkedList<>()); HashSet errorTypes = new HashSet<>();