From 7ffa5b828f119671be8005d1637dc124eb8ade22 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Fri, 17 May 2024 10:03:33 +0700 Subject: [PATCH 1/4] update gradle build and local props --- build.gradle | 4 ++-- env/local.properties | 50 +++++++++++++++++++++++--------------------- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/build.gradle b/build.gradle index dba4ca424..ae6e6874d 100644 --- a/build.gradle +++ b/build.gradle @@ -117,11 +117,11 @@ dependencies { protobuf { generatedFilesBaseDir = "$projectDir/src/generated" protoc { - artifact = "com.google.protobuf:protoc:3.1.0" + artifact = "com.google.protobuf:protoc:3.17.3" } plugins { grpc { - artifact = "io.grpc:protoc-gen-grpc-java:1.0.3" + artifact = "io.grpc:protoc-gen-grpc-java:1.60.1" } } generateProtoTasks { diff --git a/env/local.properties b/env/local.properties index 1908b7b76..f558f6eaa 100644 --- a/env/local.properties +++ b/env/local.properties @@ -9,8 +9,9 @@ ## Generic # KAFKA_RECORD_PARSER_MODE=message -SINK_TYPE=log -INPUT_SCHEMA_PROTO_CLASS=com.gotocompany.firehose.consumer.TestMessage +SINK_TYPE=http +INPUT_SCHEMA_PROTO_CLASS=stencil.One +INPUT_SCHEMA_DATA_TYPE=json # INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING={"1":"order_number","2":"event_timestamp","3":"driver_id"} # METRIC_STATSD_HOST=localhost # METRIC_STATSD_PORT=8125 @@ -26,12 +27,12 @@ INPUT_SCHEMA_PROTO_CLASS=com.gotocompany.firehose.consumer.TestMessage ## Stencil Client # SCHEMA_REGISTRY_STENCIL_ENABLE=true -SCHEMA_REGISTRY_STENCIL_URLS=http://localhost:8081/descriptors.bin +SCHEMA_REGISTRY_STENCIL_URLS=http://localhost:8080/v1beta1/namespaces/quickstart/schemas/example # SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS=10000 -# SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES=3 -# SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS=60000 +SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES=1 +SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS=500 # SCHEMA_REGISTRY_STENCIL_FETCH_AUTH_BEARER_TOKEN=tcDpw34J8d1 -SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH=false +SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH=true SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY=LONG_POLLING # SCHEMA_REGISTRY_STENCIL_CACHE_TTL_MS=900000 # @@ -41,9 +42,9 @@ SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY=LONG_POLLING ## Kafka Consumer # SOURCE_KAFKA_BROKERS=localhost:9092 -SOURCE_KAFKA_TOPIC=test-topic +SOURCE_KAFKA_TOPIC=com.local.test # SOURCE_KAFKA_CONSUMER_CONFIG_MAX_POLL_RECORDS=500 -# SOURCE_KAFKA_ASYNC_COMMIT_ENABLE=true +SOURCE_KAFKA_ASYNC_COMMIT_ENABLE=false # SOURCE_KAFKA_CONSUMER_CONFIG_SESSION_TIMEOUT_MS=10000 # SOURCE_KAFKA_COMMIT_ONLY_CURRENT_PARTITIONS_ENABLE=true # SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE=true @@ -104,23 +105,24 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id # ## HTTP SINK # -# SINK_HTTP_SERVICE_URL=http://http-service.test.io -# SINK_HTTP_REQUEST_METHOD=put -# SINK_HTTP_REQUEST_TIMEOUT_MS=10000 -# SINK_HTTP_MAX_CONNECTIONS=10 -# SINK_HTTP_RETRY_STATUS_CODE_RANGES=400-600 -# SINK_HTTP_DATA_FORMAT=proto -# SINK_HTTP_HEADERS=Authorization:auth_token, Accept:text/plain -# SINK_HTTP_JSON_BODY_TEMPLATE={"test":"$.routes[0]", "$.order_number" : "xxx"} -# SINK_HTTP_PARAMETER_SOURCE=disabled -# SINK_HTTP_PARAMETER_PLACEMENT=Header -# SINK_HTTP_PARAMETER_SCHEMA_PROTO_CLASS=com.tests.TestMessage + SINK_HTTP_SERVICE_URL=http://localhost:8085/sink + SINK_HTTP_REQUEST_METHOD=put + SINK_HTTP_REQUEST_TIMEOUT_MS=10000 + SINK_HTTP_MAX_CONNECTIONS=10 + SINK_HTTP_RETRY_STATUS_CODE_RANGES=400-600 + SINK_HTTP_DATA_FORMAT=proto + SINK_HTTP_HEADERS=Content-Type:application/json + + SINK_HTTP_JSON_BODY_TEMPLATE={"mappedField":"$.field_one"} + SINK_HTTP_PARAMETER_SOURCE=Message + SINK_HTTP_PARAMETER_PLACEMENT=Header + SINK_HTTP_PARAMETER_SCHEMA_PROTO_CLASS=stencil.One # INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING={"1":"order_number","2":"event_timestamp","3":"driver_id"} -# SINK_HTTP_OAUTH2_ENABLE=false -# SINK_HTTP_OAUTH2_ACCESS_TOKEN_URL=https://sample-oauth.my-api.com/oauth2/token -# SINK_HTTP_OAUTH2_CLIENT_NAME=client-name -# SINK_HTTP_OAUTH2_CLIENT_SECRET=client-secret -# SINK_HTTP_OAUTH2_SCOPE=User:read, sys:info + SINK_HTTP_OAUTH2_ENABLE=false + SINK_HTTP_OAUTH2_ACCESS_TOKEN_URL=https://sample-oauth.my-api.com/oauth2/token + SINK_HTTP_OAUTH2_CLIENT_NAME=client-name + SINK_HTTP_OAUTH2_CLIENT_SECRET=client-secret + SINK_HTTP_OAUTH2_SCOPE=User:read, sys:info # # ############################################# From 8998e22e878d3bf0453294c8d8aa3b555a70b029 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Mon, 28 Oct 2024 16:16:45 +0700 Subject: [PATCH 2/4] fix: Fix batched request DLQ only sending message in the first index --- .../sink/common/AbstractHttpSink.java | 13 +++- .../firehose/sink/http/HttpSinkTest.java | 67 ++++++++++++++++--- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java b/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java index 39388e3e1..d72f9bdce 100644 --- a/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java +++ b/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java @@ -61,7 +61,14 @@ public List execute() throws Exception { printRequest(httpRequests.get(i), contentStringList); } if (shouldRetry(response)) { - failedMessages.add(sourceMessages.get(i)); + if (sourceMessages.size() == httpRequests.size()) { + failedMessages.add(sourceMessages.get(i)); + } else { + int batchSize = sourceMessages.size() / httpRequests.size(); + int batchStartIndex = i * batchSize; + int batchEndIndex = Math.min((i + 1) * batchSize, sourceMessages.size()); + failedMessages.addAll(sourceMessages.subList(batchStartIndex, batchEndIndex)); + } } else if (!Pattern.compile(SUCCESS_CODE_PATTERN).matcher(String.valueOf(response.getStatusLine().getStatusCode())).matches()) { contentStringList = contentStringList == null ? readContent(httpRequests.get(i)) : contentStringList; captureMessageDropCount(response, contentStringList); @@ -163,4 +170,8 @@ public Map getRetryStatusCodeRanges() { public Map getRequestLogStatusCodeRanges() { return requestLogStatusCodeRanges; } + + private void addFailedMessages() { + + } } diff --git a/src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java b/src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java index 13ca165e9..ed08d4f49 100644 --- a/src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java @@ -60,16 +60,14 @@ public class HttpSinkTest { private Map requestLogStatusCodeRanges; private List messages; + private final String jsonString = "{\"customer_id\":\"544131618\",\"categories\":[{\"category\":\"COFFEE_SHOP\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"PIZZA_PASTA\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"ROTI\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"FASTFOOD\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0}],\"merchants\":[{\"merchant_id\":\"542629489\",\"merchant_uuid\":\"62598e60-1e5b-497c-b971-5a2bb0efb745\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542777412\",\"merchant_uuid\":\"0a84a08b-8a53-47f4-9e62-7b7c2316dd08\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542675785\",\"merchant_uuid\":\"daf41597-27d4-4475-b7c7-4f11563adcdb\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":1},{\"merchant_id\":\"542704646\",\"merchant_uuid\":\"9b522ca0-3ff0-4591-b60b-0e84b48d6d12\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542809106\",\"merchant_uuid\":\"b902f7ba-ab5e-4de1-9755-56648f556265\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":1}],\"brands\":[{\"brand_id\":\"e9f7c4b2-4fa6-489a-ab20-a1bb4638ad29\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"336eb59c-621a-4704-811c-e1024f970e2e\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"0f30e2ca-f97f-43ec-895c-0d9d729e4cca\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"901af18e-f5b7-43c5-9e67-4906d6ccce51\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"da07057d-7fe1-47de-8713-4c1edcfc9afc\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0}],\"orders_4_weeks\":2,\"orders_24_weeks\":2,\"merchant_visits_4_weeks\":4,\"app_version_major\":\"3\",\"app_version_minor\":\"30\",\"app_version_patch\":\"2\",\"current_country\":\"ID\",\"os\":\"Android\",\"wallet_id\":\"16230097256391350739\",\"dag_run_time\":\"2019-06-27T07:27:00+00:00\"}"; + private final Message message = new Message(null, jsonString.getBytes(), "", 0, 1); @Before public void setup() { initMocks(this); messages = new ArrayList<>(); - - String jsonString = "{\"customer_id\":\"544131618\",\"categories\":[{\"category\":\"COFFEE_SHOP\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"PIZZA_PASTA\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"ROTI\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"FASTFOOD\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0}],\"merchants\":[{\"merchant_id\":\"542629489\",\"merchant_uuid\":\"62598e60-1e5b-497c-b971-5a2bb0efb745\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542777412\",\"merchant_uuid\":\"0a84a08b-8a53-47f4-9e62-7b7c2316dd08\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542675785\",\"merchant_uuid\":\"daf41597-27d4-4475-b7c7-4f11563adcdb\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":1},{\"merchant_id\":\"542704646\",\"merchant_uuid\":\"9b522ca0-3ff0-4591-b60b-0e84b48d6d12\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542809106\",\"merchant_uuid\":\"b902f7ba-ab5e-4de1-9755-56648f556265\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":1}],\"brands\":[{\"brand_id\":\"e9f7c4b2-4fa6-489a-ab20-a1bb4638ad29\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"336eb59c-621a-4704-811c-e1024f970e2e\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"0f30e2ca-f97f-43ec-895c-0d9d729e4cca\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"901af18e-f5b7-43c5-9e67-4906d6ccce51\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"da07057d-7fe1-47de-8713-4c1edcfc9afc\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0}],\"orders_4_weeks\":2,\"orders_24_weeks\":2,\"merchant_visits_4_weeks\":4,\"app_version_major\":\"3\",\"app_version_minor\":\"30\",\"app_version_patch\":\"2\",\"current_country\":\"ID\",\"os\":\"Android\",\"wallet_id\":\"16230097256391350739\",\"dag_run_time\":\"2019-06-27T07:27:00+00:00\"}"; - Message message = new Message(null, jsonString.getBytes(), "", 0, 1); - messages.add(message); messages.add(message); } @@ -113,6 +111,7 @@ public void shouldReturnBackFailedMessagesWhenResponseCodeIsGivenRange() throws when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")}); when(response.getEntity()).thenReturn(httpEntity); when(httpEntity.getContent()).thenReturn(new StringInputStream("{\"key\":\"value\"}")); + int expectedFailedMessage = messages.size(); HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges); @@ -120,12 +119,61 @@ public void shouldReturnBackFailedMessagesWhenResponseCodeIsGivenRange() throws List failedMessages = httpSink.execute(); assertFalse(failedMessages.isEmpty()); - assertEquals(1, failedMessages.size()); + assertEquals(expectedFailedMessage, failedMessages.size()); } @Test - public void shouldReturnBackFailedMessagesWhenResponseIsNull() throws Exception { + public void shouldReturnBackAllFailedMessagesWhenResponseCodeIsGivenRangeAndRequestIsBatched() throws Exception { + when(response.getStatusLine()).thenReturn(statusLine); + when(statusLine.getStatusCode()).thenReturn(500); + //Send messages in batch of two + List httpRequests = Arrays.asList(httpPut, httpPut); + messages.add(message); + messages.add(message); + + when(httpPut.getURI()).thenReturn(new URI("http://dummy.com")); + when(request.build(messages)).thenReturn(httpRequests); + when(httpClient.execute(httpPut)).thenReturn(response); + when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")}); + when(response.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(new StringInputStream("{\"key\":\"value\"}")); + int expectedFailedMessage = messages.size(); + + HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, + new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges); + httpSink.prepare(messages); + List failedMessages = httpSink.execute(); + + assertFalse(failedMessages.isEmpty()); + assertEquals(expectedFailedMessage, failedMessages.size()); + } + @Test + public void shouldReturnBackAllFailedMessagesWhenResponseCodeIsGivenRangeAndRequestIsIndividual() throws Exception { + when(response.getStatusLine()).thenReturn(statusLine); + when(statusLine.getStatusCode()).thenReturn(500); + //Send messages in two batch each containing 1 message + List httpRequests = Arrays.asList(httpPut, httpPut); + + when(httpPut.getURI()).thenReturn(new URI("http://dummy.com")); + when(request.build(messages)).thenReturn(httpRequests); + when(httpClient.execute(httpPut)).thenReturn(response); + when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")}); + when(response.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(new StringInputStream("{\"key\":\"value\"}")); + int expectedFailedMessage = messages.size(); + + HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, + new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges); + httpSink.prepare(messages); + List failedMessages = httpSink.execute(); + + assertFalse(failedMessages.isEmpty()); + assertEquals(expectedFailedMessage, failedMessages.size()); + } + + @Test + public void shouldReturnBackFailedMessagesWhenResponseIsNull() throws Exception { List httpRequests = Arrays.asList(httpPut); when(httpPut.getURI()).thenReturn(new URI("http://dummy.com")); @@ -135,18 +183,18 @@ public void shouldReturnBackFailedMessagesWhenResponseIsNull() throws Exception when(request.build(messages)).thenReturn(httpRequests); when(httpClient.execute(httpPut)).thenReturn(null); when(httpPut.getMethod()).thenReturn("PUT"); + int expectedFailedMessageCount = messages.size(); HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); httpSink.prepare(messages); List failedMessages = httpSink.execute(); assertFalse(failedMessages.isEmpty()); - assertEquals(1, failedMessages.size()); + assertEquals(expectedFailedMessageCount, failedMessages.size()); } @Test public void shouldReturnBackFailedMessagesWhenResponseStatusCodeIsZero() throws Exception { - List httpRequests = Arrays.asList(httpPut); when(httpPut.getURI()).thenReturn(new URI("http://dummy.com")); @@ -157,13 +205,14 @@ public void shouldReturnBackFailedMessagesWhenResponseStatusCodeIsZero() throws when(httpClient.execute(httpPut)).thenReturn(response); when(response.getStatusLine()).thenReturn(statusLine); when(statusLine.getStatusCode()).thenReturn(0); + int expectedFailedMessage = messages.size(); HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); httpSink.prepare(messages); List failedMessages = httpSink.execute(); assertFalse(failedMessages.isEmpty()); - assertEquals(1, failedMessages.size()); + assertEquals(expectedFailedMessage, failedMessages.size()); } @Test(expected = IOException.class) From 89328277e3d250596c9d66e9688542a2c8a99611 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Mon, 28 Oct 2024 16:19:49 +0700 Subject: [PATCH 3/4] chore: revert build.gradle --- build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 3dc3e2f1d..bdd91140b 100644 --- a/build.gradle +++ b/build.gradle @@ -117,11 +117,11 @@ dependencies { protobuf { generatedFilesBaseDir = "$projectDir/src/generated" protoc { - artifact = "com.google.protobuf:protoc:3.17.3" + artifact = "com.google.protobuf:protoc:3.1.0" } plugins { grpc { - artifact = "io.grpc:protoc-gen-grpc-java:1.60.1" + artifact = "io.grpc:protoc-gen-grpc-java:1.0.3" } } generateProtoTasks { From 9bb2c67c0e22c3ab9a99be12bac4b713468dccf1 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Mon, 28 Oct 2024 16:21:37 +0700 Subject: [PATCH 4/4] chore: revert unwanted changes --- env/local.properties | 51 +++++++++---------- .../sink/common/AbstractHttpSink.java | 4 -- 2 files changed, 25 insertions(+), 30 deletions(-) diff --git a/env/local.properties b/env/local.properties index 9d8a3c4c7..878dc063c 100644 --- a/env/local.properties +++ b/env/local.properties @@ -9,9 +9,8 @@ ## Generic # KAFKA_RECORD_PARSER_MODE=message -SINK_TYPE=http -INPUT_SCHEMA_PROTO_CLASS=stencil.One -INPUT_SCHEMA_DATA_TYPE=json +SINK_TYPE=log +INPUT_SCHEMA_PROTO_CLASS=com.gotocompany.firehose.consumer.TestMessage # INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING={"1":"order_number","2":"event_timestamp","3":"driver_id"} # METRIC_STATSD_HOST=localhost # METRIC_STATSD_PORT=8125 @@ -27,12 +26,12 @@ INPUT_SCHEMA_DATA_TYPE=json ## Stencil Client # SCHEMA_REGISTRY_STENCIL_ENABLE=true -SCHEMA_REGISTRY_STENCIL_URLS=http://localhost:8080/v1beta1/namespaces/quickstart/schemas/example +SCHEMA_REGISTRY_STENCIL_URLS=http://localhost:8081/descriptors.bin # SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS=10000 -SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES=1 -SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS=500 +# SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES=3 +# SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS=60000 # SCHEMA_REGISTRY_STENCIL_FETCH_AUTH_BEARER_TOKEN=tcDpw34J8d1 -SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH=true +SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH=false SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY=LONG_POLLING # SCHEMA_REGISTRY_STENCIL_CACHE_TTL_MS=900000 # @@ -42,9 +41,9 @@ SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY=LONG_POLLING ## Kafka Consumer # SOURCE_KAFKA_BROKERS=localhost:9092 -SOURCE_KAFKA_TOPIC=com.local.test +SOURCE_KAFKA_TOPIC=test-topic # SOURCE_KAFKA_CONSUMER_CONFIG_MAX_POLL_RECORDS=500 -SOURCE_KAFKA_ASYNC_COMMIT_ENABLE=false +# SOURCE_KAFKA_ASYNC_COMMIT_ENABLE=true # SOURCE_KAFKA_CONSUMER_CONFIG_SESSION_TIMEOUT_MS=10000 # SOURCE_KAFKA_COMMIT_ONLY_CURRENT_PARTITIONS_ENABLE=true # SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE=true @@ -108,24 +107,24 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id # ## HTTP SINK # - SINK_HTTP_SERVICE_URL=http://localhost:8085/sink - SINK_HTTP_REQUEST_METHOD=put - SINK_HTTP_REQUEST_TIMEOUT_MS=10000 - SINK_HTTP_MAX_CONNECTIONS=10 - SINK_HTTP_RETRY_STATUS_CODE_RANGES=400-600 - SINK_HTTP_DATA_FORMAT=proto - SINK_HTTP_HEADERS=Content-Type:application/json - - SINK_HTTP_JSON_BODY_TEMPLATE={"mappedField":"$.field_one"} - SINK_HTTP_PARAMETER_SOURCE=Message - SINK_HTTP_PARAMETER_PLACEMENT=Header - SINK_HTTP_PARAMETER_SCHEMA_PROTO_CLASS=stencil.One +# SINK_HTTP_SERVICE_URL=http://http-service.test.io +# SINK_HTTP_REQUEST_METHOD=put +# SINK_HTTP_REQUEST_TIMEOUT_MS=10000 +# SINK_HTTP_MAX_CONNECTIONS=10 +# SINK_HTTP_RETRY_STATUS_CODE_RANGES=400-600 +# SINK_HTTP_DATA_FORMAT=proto +# SINK_HTTP_HEADERS=Authorization:auth_token, Accept:text/plain +# SINK_HTTP_JSON_BODY_TEMPLATE={"test":"$.routes[0]", "$.order_number" : "xxx"} +# SINK_HTTP_PARAMETER_SOURCE=disabled +# SINK_HTTP_PARAMETER_PLACEMENT=Header +# SINK_HTTP_PARAMETER_SCHEMA_PROTO_CLASS=com.tests.TestMessage # INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING={"1":"order_number","2":"event_timestamp","3":"driver_id"} - SINK_HTTP_OAUTH2_ENABLE=false - SINK_HTTP_OAUTH2_ACCESS_TOKEN_URL=https://sample-oauth.my-api.com/oauth2/token - SINK_HTTP_OAUTH2_CLIENT_NAME=client-name - SINK_HTTP_OAUTH2_CLIENT_SECRET=client-secret - SINK_HTTP_OAUTH2_SCOPE=User:read, sys:info +# SINK_HTTP_OAUTH2_ENABLE=false +# SINK_HTTP_OAUTH2_ACCESS_TOKEN_URL=https://sample-oauth.my-api.com/oauth2/token +# SINK_HTTP_OAUTH2_CLIENT_NAME=client-name +# SINK_HTTP_OAUTH2_CLIENT_SECRET=client-secret +# SINK_HTTP_OAUTH2_SCOPE=User:read, sys:info +# SINK_HTTP_SERIALIZER_JSON_TYPECAST=[{"jsonPath": "$.root.field1", "type": "INTEGER"},{"jsonPath": "$.root.field2", "type": "INTEGER"},{"jsonPath": "$..field3", "type": "INTEGER"},{"jsonPath": "$..[*].field4", "type": "DOUBLE"}] # # ############################################# diff --git a/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java b/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java index d72f9bdce..5883f690c 100644 --- a/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java +++ b/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java @@ -170,8 +170,4 @@ public Map getRetryStatusCodeRanges() { public Map getRequestLogStatusCodeRanges() { return requestLogStatusCodeRanges; } - - private void addFailedMessages() { - - } }