diff --git a/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java b/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java index 39388e3e1..5883f690c 100644 --- a/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java +++ b/src/main/java/com/gotocompany/firehose/sink/common/AbstractHttpSink.java @@ -61,7 +61,14 @@ public List execute() throws Exception { printRequest(httpRequests.get(i), contentStringList); } if (shouldRetry(response)) { - failedMessages.add(sourceMessages.get(i)); + if (sourceMessages.size() == httpRequests.size()) { + failedMessages.add(sourceMessages.get(i)); + } else { + int batchSize = sourceMessages.size() / httpRequests.size(); + int batchStartIndex = i * batchSize; + int batchEndIndex = Math.min((i + 1) * batchSize, sourceMessages.size()); + failedMessages.addAll(sourceMessages.subList(batchStartIndex, batchEndIndex)); + } } else if (!Pattern.compile(SUCCESS_CODE_PATTERN).matcher(String.valueOf(response.getStatusLine().getStatusCode())).matches()) { contentStringList = contentStringList == null ? readContent(httpRequests.get(i)) : contentStringList; captureMessageDropCount(response, contentStringList); diff --git a/src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java b/src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java index 13ca165e9..ed08d4f49 100644 --- a/src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java @@ -60,16 +60,14 @@ public class HttpSinkTest { private Map requestLogStatusCodeRanges; private List messages; + private final String jsonString = "{\"customer_id\":\"544131618\",\"categories\":[{\"category\":\"COFFEE_SHOP\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"PIZZA_PASTA\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"ROTI\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"FASTFOOD\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0}],\"merchants\":[{\"merchant_id\":\"542629489\",\"merchant_uuid\":\"62598e60-1e5b-497c-b971-5a2bb0efb745\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542777412\",\"merchant_uuid\":\"0a84a08b-8a53-47f4-9e62-7b7c2316dd08\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542675785\",\"merchant_uuid\":\"daf41597-27d4-4475-b7c7-4f11563adcdb\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":1},{\"merchant_id\":\"542704646\",\"merchant_uuid\":\"9b522ca0-3ff0-4591-b60b-0e84b48d6d12\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542809106\",\"merchant_uuid\":\"b902f7ba-ab5e-4de1-9755-56648f556265\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":1}],\"brands\":[{\"brand_id\":\"e9f7c4b2-4fa6-489a-ab20-a1bb4638ad29\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"336eb59c-621a-4704-811c-e1024f970e2e\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"0f30e2ca-f97f-43ec-895c-0d9d729e4cca\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"901af18e-f5b7-43c5-9e67-4906d6ccce51\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"da07057d-7fe1-47de-8713-4c1edcfc9afc\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0}],\"orders_4_weeks\":2,\"orders_24_weeks\":2,\"merchant_visits_4_weeks\":4,\"app_version_major\":\"3\",\"app_version_minor\":\"30\",\"app_version_patch\":\"2\",\"current_country\":\"ID\",\"os\":\"Android\",\"wallet_id\":\"16230097256391350739\",\"dag_run_time\":\"2019-06-27T07:27:00+00:00\"}"; + private final Message message = new Message(null, jsonString.getBytes(), "", 0, 1); @Before public void setup() { initMocks(this); messages = new ArrayList<>(); - - String jsonString = "{\"customer_id\":\"544131618\",\"categories\":[{\"category\":\"COFFEE_SHOP\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"PIZZA_PASTA\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"ROTI\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"FASTFOOD\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0}],\"merchants\":[{\"merchant_id\":\"542629489\",\"merchant_uuid\":\"62598e60-1e5b-497c-b971-5a2bb0efb745\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542777412\",\"merchant_uuid\":\"0a84a08b-8a53-47f4-9e62-7b7c2316dd08\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542675785\",\"merchant_uuid\":\"daf41597-27d4-4475-b7c7-4f11563adcdb\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":1},{\"merchant_id\":\"542704646\",\"merchant_uuid\":\"9b522ca0-3ff0-4591-b60b-0e84b48d6d12\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542809106\",\"merchant_uuid\":\"b902f7ba-ab5e-4de1-9755-56648f556265\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":1}],\"brands\":[{\"brand_id\":\"e9f7c4b2-4fa6-489a-ab20-a1bb4638ad29\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"336eb59c-621a-4704-811c-e1024f970e2e\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"0f30e2ca-f97f-43ec-895c-0d9d729e4cca\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"901af18e-f5b7-43c5-9e67-4906d6ccce51\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"da07057d-7fe1-47de-8713-4c1edcfc9afc\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0}],\"orders_4_weeks\":2,\"orders_24_weeks\":2,\"merchant_visits_4_weeks\":4,\"app_version_major\":\"3\",\"app_version_minor\":\"30\",\"app_version_patch\":\"2\",\"current_country\":\"ID\",\"os\":\"Android\",\"wallet_id\":\"16230097256391350739\",\"dag_run_time\":\"2019-06-27T07:27:00+00:00\"}"; - Message message = new Message(null, jsonString.getBytes(), "", 0, 1); - messages.add(message); messages.add(message); } @@ -113,6 +111,7 @@ public void shouldReturnBackFailedMessagesWhenResponseCodeIsGivenRange() throws when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")}); when(response.getEntity()).thenReturn(httpEntity); when(httpEntity.getContent()).thenReturn(new StringInputStream("{\"key\":\"value\"}")); + int expectedFailedMessage = messages.size(); HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges); @@ -120,12 +119,61 @@ public void shouldReturnBackFailedMessagesWhenResponseCodeIsGivenRange() throws List failedMessages = httpSink.execute(); assertFalse(failedMessages.isEmpty()); - assertEquals(1, failedMessages.size()); + assertEquals(expectedFailedMessage, failedMessages.size()); } @Test - public void shouldReturnBackFailedMessagesWhenResponseIsNull() throws Exception { + public void shouldReturnBackAllFailedMessagesWhenResponseCodeIsGivenRangeAndRequestIsBatched() throws Exception { + when(response.getStatusLine()).thenReturn(statusLine); + when(statusLine.getStatusCode()).thenReturn(500); + //Send messages in batch of two + List httpRequests = Arrays.asList(httpPut, httpPut); + messages.add(message); + messages.add(message); + + when(httpPut.getURI()).thenReturn(new URI("http://dummy.com")); + when(request.build(messages)).thenReturn(httpRequests); + when(httpClient.execute(httpPut)).thenReturn(response); + when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")}); + when(response.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(new StringInputStream("{\"key\":\"value\"}")); + int expectedFailedMessage = messages.size(); + + HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, + new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges); + httpSink.prepare(messages); + List failedMessages = httpSink.execute(); + + assertFalse(failedMessages.isEmpty()); + assertEquals(expectedFailedMessage, failedMessages.size()); + } + @Test + public void shouldReturnBackAllFailedMessagesWhenResponseCodeIsGivenRangeAndRequestIsIndividual() throws Exception { + when(response.getStatusLine()).thenReturn(statusLine); + when(statusLine.getStatusCode()).thenReturn(500); + //Send messages in two batch each containing 1 message + List httpRequests = Arrays.asList(httpPut, httpPut); + + when(httpPut.getURI()).thenReturn(new URI("http://dummy.com")); + when(request.build(messages)).thenReturn(httpRequests); + when(httpClient.execute(httpPut)).thenReturn(response); + when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")}); + when(response.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(new StringInputStream("{\"key\":\"value\"}")); + int expectedFailedMessage = messages.size(); + + HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, + new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges); + httpSink.prepare(messages); + List failedMessages = httpSink.execute(); + + assertFalse(failedMessages.isEmpty()); + assertEquals(expectedFailedMessage, failedMessages.size()); + } + + @Test + public void shouldReturnBackFailedMessagesWhenResponseIsNull() throws Exception { List httpRequests = Arrays.asList(httpPut); when(httpPut.getURI()).thenReturn(new URI("http://dummy.com")); @@ -135,18 +183,18 @@ public void shouldReturnBackFailedMessagesWhenResponseIsNull() throws Exception when(request.build(messages)).thenReturn(httpRequests); when(httpClient.execute(httpPut)).thenReturn(null); when(httpPut.getMethod()).thenReturn("PUT"); + int expectedFailedMessageCount = messages.size(); HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); httpSink.prepare(messages); List failedMessages = httpSink.execute(); assertFalse(failedMessages.isEmpty()); - assertEquals(1, failedMessages.size()); + assertEquals(expectedFailedMessageCount, failedMessages.size()); } @Test public void shouldReturnBackFailedMessagesWhenResponseStatusCodeIsZero() throws Exception { - List httpRequests = Arrays.asList(httpPut); when(httpPut.getURI()).thenReturn(new URI("http://dummy.com")); @@ -157,13 +205,14 @@ public void shouldReturnBackFailedMessagesWhenResponseStatusCodeIsZero() throws when(httpClient.execute(httpPut)).thenReturn(response); when(response.getStatusLine()).thenReturn(statusLine); when(statusLine.getStatusCode()).thenReturn(0); + int expectedFailedMessage = messages.size(); HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges); httpSink.prepare(messages); List failedMessages = httpSink.execute(); assertFalse(failedMessages.isEmpty()); - assertEquals(1, failedMessages.size()); + assertEquals(expectedFailedMessage, failedMessages.size()); } @Test(expected = IOException.class)