Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,14 @@ public List<Message> execute() throws Exception {
printRequest(httpRequests.get(i), contentStringList);
}
if (shouldRetry(response)) {
failedMessages.add(sourceMessages.get(i));
if (sourceMessages.size() == httpRequests.size()) {
failedMessages.add(sourceMessages.get(i));
} else {
int batchSize = sourceMessages.size() / httpRequests.size();
int batchStartIndex = i * batchSize;
int batchEndIndex = Math.min((i + 1) * batchSize, sourceMessages.size());
failedMessages.addAll(sourceMessages.subList(batchStartIndex, batchEndIndex));
}
} else if (!Pattern.compile(SUCCESS_CODE_PATTERN).matcher(String.valueOf(response.getStatusLine().getStatusCode())).matches()) {
contentStringList = contentStringList == null ? readContent(httpRequests.get(i)) : contentStringList;
captureMessageDropCount(response, contentStringList);
Expand Down
67 changes: 58 additions & 9 deletions src/test/java/com/gotocompany/firehose/sink/http/HttpSinkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,14 @@ public class HttpSinkTest {
private Map<Integer, Boolean> requestLogStatusCodeRanges;

private List<Message> messages;
private final String jsonString = "{\"customer_id\":\"544131618\",\"categories\":[{\"category\":\"COFFEE_SHOP\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"PIZZA_PASTA\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"ROTI\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"FASTFOOD\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0}],\"merchants\":[{\"merchant_id\":\"542629489\",\"merchant_uuid\":\"62598e60-1e5b-497c-b971-5a2bb0efb745\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542777412\",\"merchant_uuid\":\"0a84a08b-8a53-47f4-9e62-7b7c2316dd08\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542675785\",\"merchant_uuid\":\"daf41597-27d4-4475-b7c7-4f11563adcdb\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":1},{\"merchant_id\":\"542704646\",\"merchant_uuid\":\"9b522ca0-3ff0-4591-b60b-0e84b48d6d12\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542809106\",\"merchant_uuid\":\"b902f7ba-ab5e-4de1-9755-56648f556265\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":1}],\"brands\":[{\"brand_id\":\"e9f7c4b2-4fa6-489a-ab20-a1bb4638ad29\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"336eb59c-621a-4704-811c-e1024f970e2e\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"0f30e2ca-f97f-43ec-895c-0d9d729e4cca\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"901af18e-f5b7-43c5-9e67-4906d6ccce51\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"da07057d-7fe1-47de-8713-4c1edcfc9afc\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0}],\"orders_4_weeks\":2,\"orders_24_weeks\":2,\"merchant_visits_4_weeks\":4,\"app_version_major\":\"3\",\"app_version_minor\":\"30\",\"app_version_patch\":\"2\",\"current_country\":\"ID\",\"os\":\"Android\",\"wallet_id\":\"16230097256391350739\",\"dag_run_time\":\"2019-06-27T07:27:00+00:00\"}";
private final Message message = new Message(null, jsonString.getBytes(), "", 0, 1);

@Before
public void setup() {
initMocks(this);

messages = new ArrayList<>();

String jsonString = "{\"customer_id\":\"544131618\",\"categories\":[{\"category\":\"COFFEE_SHOP\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"PIZZA_PASTA\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"ROTI\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"category\":\"FASTFOOD\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0}],\"merchants\":[{\"merchant_id\":\"542629489\",\"merchant_uuid\":\"62598e60-1e5b-497c-b971-5a2bb0efb745\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542777412\",\"merchant_uuid\":\"0a84a08b-8a53-47f4-9e62-7b7c2316dd08\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542675785\",\"merchant_uuid\":\"daf41597-27d4-4475-b7c7-4f11563adcdb\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":1},{\"merchant_id\":\"542704646\",\"merchant_uuid\":\"9b522ca0-3ff0-4591-b60b-0e84b48d6d12\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":2000},{\"merchant_id\":\"542809106\",\"merchant_uuid\":\"b902f7ba-ab5e-4de1-9755-56648f556265\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0,\"days_since_last_order\":1}],\"brands\":[{\"brand_id\":\"e9f7c4b2-4fa6-489a-ab20-a1bb4638ad29\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"336eb59c-621a-4704-811c-e1024f970e2e\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"0f30e2ca-f97f-43ec-895c-0d9d729e4cca\",\"merchant_visits_4_weeks\":0,\"orders_4_weeks\":1,\"orders_24_weeks\":1,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"901af18e-f5b7-43c5-9e67-4906d6ccce51\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0},{\"brand_id\":\"da07057d-7fe1-47de-8713-4c1edcfc9afc\",\"merchant_visits_4_weeks\":1,\"orders_4_weeks\":0,\"orders_24_weeks\":0,\"allocated\":0.0,\"redeemed\":0.0}],\"orders_4_weeks\":2,\"orders_24_weeks\":2,\"merchant_visits_4_weeks\":4,\"app_version_major\":\"3\",\"app_version_minor\":\"30\",\"app_version_patch\":\"2\",\"current_country\":\"ID\",\"os\":\"Android\",\"wallet_id\":\"16230097256391350739\",\"dag_run_time\":\"2019-06-27T07:27:00+00:00\"}";
Message message = new Message(null, jsonString.getBytes(), "", 0, 1);

messages.add(message);
messages.add(message);
}
Expand Down Expand Up @@ -113,19 +111,69 @@ public void shouldReturnBackFailedMessagesWhenResponseCodeIsGivenRange() throws
when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")});
when(response.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(new StringInputStream("{\"key\":\"value\"}"));
int expectedFailedMessage = messages.size();

HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient,
new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges);
httpSink.prepare(messages);
List<Message> failedMessages = httpSink.execute();

assertFalse(failedMessages.isEmpty());
assertEquals(1, failedMessages.size());
assertEquals(expectedFailedMessage, failedMessages.size());
}

@Test
public void shouldReturnBackFailedMessagesWhenResponseIsNull() throws Exception {
public void shouldReturnBackAllFailedMessagesWhenResponseCodeIsGivenRangeAndRequestIsBatched() throws Exception {
when(response.getStatusLine()).thenReturn(statusLine);
when(statusLine.getStatusCode()).thenReturn(500);
//Send messages in batch of two
List<HttpEntityEnclosingRequestBase> httpRequests = Arrays.asList(httpPut, httpPut);
messages.add(message);
messages.add(message);

when(httpPut.getURI()).thenReturn(new URI("http://dummy.com"));
when(request.build(messages)).thenReturn(httpRequests);
when(httpClient.execute(httpPut)).thenReturn(response);
when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")});
when(response.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(new StringInputStream("{\"key\":\"value\"}"));
int expectedFailedMessage = messages.size();

HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient,
new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges);
httpSink.prepare(messages);
List<Message> failedMessages = httpSink.execute();

assertFalse(failedMessages.isEmpty());
assertEquals(expectedFailedMessage, failedMessages.size());
}

@Test
public void shouldReturnBackAllFailedMessagesWhenResponseCodeIsGivenRangeAndRequestIsIndividual() throws Exception {
when(response.getStatusLine()).thenReturn(statusLine);
when(statusLine.getStatusCode()).thenReturn(500);
//Send messages in two batch each containing 1 message
List<HttpEntityEnclosingRequestBase> httpRequests = Arrays.asList(httpPut, httpPut);

when(httpPut.getURI()).thenReturn(new URI("http://dummy.com"));
when(request.build(messages)).thenReturn(httpRequests);
when(httpClient.execute(httpPut)).thenReturn(response);
when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")});
when(response.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(new StringInputStream("{\"key\":\"value\"}"));
int expectedFailedMessage = messages.size();

HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient,
new RangeToHashMapConverter().convert(null, "400-505"), requestLogStatusCodeRanges);
httpSink.prepare(messages);
List<Message> failedMessages = httpSink.execute();

assertFalse(failedMessages.isEmpty());
assertEquals(expectedFailedMessage, failedMessages.size());
}

@Test
public void shouldReturnBackFailedMessagesWhenResponseIsNull() throws Exception {
List<HttpEntityEnclosingRequestBase> httpRequests = Arrays.asList(httpPut);

when(httpPut.getURI()).thenReturn(new URI("http://dummy.com"));
Expand All @@ -135,18 +183,18 @@ public void shouldReturnBackFailedMessagesWhenResponseIsNull() throws Exception
when(request.build(messages)).thenReturn(httpRequests);
when(httpClient.execute(httpPut)).thenReturn(null);
when(httpPut.getMethod()).thenReturn("PUT");
int expectedFailedMessageCount = messages.size();

HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges);
httpSink.prepare(messages);
List<Message> failedMessages = httpSink.execute();

assertFalse(failedMessages.isEmpty());
assertEquals(1, failedMessages.size());
assertEquals(expectedFailedMessageCount, failedMessages.size());
}

@Test
public void shouldReturnBackFailedMessagesWhenResponseStatusCodeIsZero() throws Exception {

List<HttpEntityEnclosingRequestBase> httpRequests = Arrays.asList(httpPut);

when(httpPut.getURI()).thenReturn(new URI("http://dummy.com"));
Expand All @@ -157,13 +205,14 @@ public void shouldReturnBackFailedMessagesWhenResponseStatusCodeIsZero() throws
when(httpClient.execute(httpPut)).thenReturn(response);
when(response.getStatusLine()).thenReturn(statusLine);
when(statusLine.getStatusCode()).thenReturn(0);
int expectedFailedMessage = messages.size();

HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges);
httpSink.prepare(messages);
List<Message> failedMessages = httpSink.execute();

assertFalse(failedMessages.isEmpty());
assertEquals(1, failedMessages.size());
assertEquals(expectedFailedMessage, failedMessages.size());
}

@Test(expected = IOException.class)
Expand Down
Loading