Skip to content

Commit b7793cf

Browse files
authored
Ignore failed batch with 'invalid event format' error (#242)
1 parent 38409de commit b7793cf

File tree

5 files changed

+58
-4
lines changed

5 files changed

+58
-4
lines changed

src/main/java/com/splunk/hecclient/HecAckPoller.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,22 @@ public void add(HecChannel channel, EventBatch batch, String response) {
130130
fail(channel, batch, ex);
131131
return;
132132
}
133+
134+
if (resp.getText() == "Invalid data format") {
135+
log.warn("Invalid Splunk HEC data format. Ignoring events. channel={} index={} events={}", channel, channel.getIndexer(), batch.toString());
136+
batch.commit();
137+
List<EventBatch> committedBatches = new ArrayList<>();
138+
committedBatches.add(batch);
139+
pollerCallback.onEventCommitted(committedBatches);
140+
return;
141+
}
133142

134143
ConcurrentHashMap<Long, EventBatch> channelEvents = outstandingEventBatches.get(channel);
135144
if (channelEvents == null) {
136145
outstandingEventBatches.putIfAbsent(channel, new ConcurrentHashMap<>());
137146
channelEvents = outstandingEventBatches.get(channel);
138147
}
139-
148+
140149
if (channelEvents.get(resp.getAckId()) != null) {
141150
log.warn("ackId={} already exists for channel={} index={} data may be duplicated in Splunk", resp.getAckId(), channel, channel.getIndexer());
142151
return;

src/main/java/com/splunk/hecclient/Indexer.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,18 @@
2626
import org.apache.http.message.BasicHeader;
2727
import org.apache.http.protocol.HttpContext;
2828
import org.apache.http.util.EntityUtils;
29+
import com.fasterxml.jackson.databind.ObjectMapper;
30+
import com.fasterxml.jackson.databind.node.ObjectNode;
31+
import com.fasterxml.jackson.databind.JsonNode;
32+
2933
import org.slf4j.Logger;
3034
import org.slf4j.LoggerFactory;
3135

3236
import java.io.IOException;
3337

3438
final class Indexer implements IndexerInf {
3539
private static final Logger log = LoggerFactory.getLogger(Indexer.class);
40+
private static final ObjectMapper jsonMapper = new ObjectMapper();
3641

3742
private CloseableHttpClient httpClient;
3843
private HttpContext context;
@@ -165,8 +170,8 @@ private String readAndCloseResponse(CloseableHttpResponse resp) {
165170
throw new HecException("failed to close http response", ex);
166171
}
167172
}
168-
169-
// log.info("event posting, channel={}, cookies={}, cookies.length={}", channel, resp.getHeaders("Set-Cookie"), resp.getHeaders("Set-Cookie").length);
173+
174+
//log.info("event posting, channel={}, cookies={}, cookies.length={}", channel, resp.getHeaders("Set-Cookie"), resp.getHeaders("Set-Cookie").length);
170175

171176
if((resp.getHeaders("Set-Cookie") != null) && (resp.getHeaders("Set-Cookie").length > 0)) {
172177
log.info("Sticky session expiry detected, will cleanup old channel and its associated batches");
@@ -181,7 +186,25 @@ private String readAndCloseResponse(CloseableHttpResponse resp) {
181186
}
182187

183188
log.error("failed to post events resp={}, status={}", respPayload, status);
184-
throw new HecException(String.format("failed to post events resp=%s, status=%d", respPayload, status));
189+
JsonNode jsonNode;
190+
try {
191+
jsonNode = jsonMapper.readTree(respPayload);
192+
} catch (Exception ex) {
193+
log.error("failed to parse response payload", ex);
194+
throw new HecException("failed to parse response payload", ex);
195+
}
196+
197+
String respText = (jsonNode.has("text")) ? jsonNode.get("text").asText() : null;
198+
199+
if (respText == "Invalid data format") {
200+
ObjectNode objNode = jsonMapper.createObjectNode();
201+
objNode.put("text", "Invalid data format");
202+
objNode.put("code", 0); // Mark it as success
203+
objNode.put("ackId", -1);
204+
respPayload = objNode.toString();
205+
} else {
206+
throw new HecException(String.format("failed to post events resp=%s, status=%d", respPayload, status));
207+
}
185208
}
186209

187210
clearBackPressure();

src/main/java/com/splunk/hecclient/ResponsePoller.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ public void add(HecChannel channel, EventBatch batch, String resp) {
6868
fail(channel, batch, new HecException(response.getText()));
6969
return;
7070
}
71+
if (response.getText() == "Invalid data format") {
72+
log.warn("Invalid Splunk HEC data format. Ignoring events. channel={} index={} events={}", channel, channel.getIndexer(), batch.toString());
73+
}
7174
} catch (Exception ex) {
7275
log.error("failed to parse response", resp, ex);
7376
fail(channel, batch, ex);

src/test/java/com/splunk/hecclient/CloseableHttpClientMock.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class CloseableHttpClientMock extends CloseableHttpClient {
3030
public static final String success = "{\"text\":\"Success\",\"code\":0,\"ackId\":2}";
3131
public static final String serverBusy = "{\"text\":\"Server busy\",\"code\":1}";
3232
public static final String noDataError = "{\"text\":\"No data\",\"code\":5}";
33+
public static final String invalidDataFormat = "{\"text\":\"Invalid data format\",\"code\":6}";
3334
public static final String exception = "excpetion";
3435

3536
private String resp = "";

src/test/java/com/splunk/hecclient/IndexerTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,24 @@ public void sendWithSuccess() {
112112
}
113113
}
114114

115+
@Test
116+
public void sendWithInvalidData() {
117+
CloseableHttpClientMock client = new CloseableHttpClientMock();
118+
client.setResponse(CloseableHttpClientMock.invalidDataFormat);
119+
PollerMock poller = new PollerMock();
120+
121+
Indexer indexer = new Indexer(baseUrl, token, client, poller);
122+
EventBatch batch = UnitUtil.createBatch();
123+
boolean result = indexer.send(batch);
124+
Assert.assertTrue(result);
125+
Assert.assertNotNull(poller.getBatch());
126+
Assert.assertNull(poller.getFailedBatch());
127+
Assert.assertNull(poller.getException());
128+
Assert.assertEquals(indexer.getChannel(), poller.getChannel());
129+
Assert.assertEquals(CloseableHttpClientMock.success, poller.getResponse());
130+
}
131+
132+
115133
@Test
116134
public void sendWithServerBusy() {
117135
CloseableHttpClientMock client = new CloseableHttpClientMock();

0 commit comments

Comments
 (0)