From 55085f50afc699f25979c6a5964ec01e67f7d697 Mon Sep 17 00:00:00 2001 From: deepesh1234 Date: Mon, 7 Sep 2020 14:27:48 +0530 Subject: [PATCH 01/11] HCD-387 Supported kerberos for Splunk Sink. --- src/main/java/com/splunk/hecclient/Hec.java | 15 ++++ .../splunk/hecclient/HttpClientBuilder.java | 43 ++++++++++ .../java/com/splunk/hecclient/Indexer.java | 79 ++++++++++++++++--- .../connect/SplunkSinkConnectorConfig.java | 35 +++++++- 4 files changed, 159 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/splunk/hecclient/Hec.java b/src/main/java/com/splunk/hecclient/Hec.java index f170245a..e620191b 100644 --- a/src/main/java/com/splunk/hecclient/Hec.java +++ b/src/main/java/com/splunk/hecclient/Hec.java @@ -15,6 +15,7 @@ */ package com.splunk.hecclient; +import com.splunk.kafka.connect.SplunkSinkConnectorConfig; import org.apache.http.impl.client.CloseableHttpClient; import java.io.FileInputStream; @@ -264,6 +265,20 @@ public final void close() { public static CloseableHttpClient createHttpClient(final HecConfig config) { int poolSizePerDest = config.getMaxHttpConnectionPerChannel(); + if (!SplunkSinkConnectorConfig.kerberosPrincipal().isEmpty() + && !SplunkSinkConnectorConfig.kerberosKeytabLocation().isEmpty() + && !SplunkSinkConnectorConfig.kerberosUser().isEmpty() + ) { + try { + return (CloseableHttpClient) new HttpClientBuilder().buildKerberosClient(); + } catch (KeyStoreException e) { + e.printStackTrace(); + } catch (NoSuchAlgorithmException e) { + e.printStackTrace(); + } catch (KeyManagementException e) { + e.printStackTrace(); + } + } // Code block for default client construction if(!config.getHasCustomTrustStore() && StringUtils.isBlank(config.getTrustStorePath()) && diff --git a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java index 4047ca37..1bc5a562 100644 --- a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java +++ b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java @@ -15,11 +15,21 @@ */ package com.splunk.hecclient; +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.Credentials; +import org.apache.http.client.HttpClient; +import org.apache.http.client.config.AuthSchemes; import org.apache.http.client.config.CookieSpecs; import org.apache.http.client.config.RequestConfig; +import org.apache.http.config.Lookup; +import org.apache.http.config.RegistryBuilder; import org.apache.http.config.SocketConfig; +import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustStrategy; +import org.apache.http.impl.auth.SPNegoSchemeFactory; +import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.ssl.SSLContextBuilder; @@ -27,6 +37,10 @@ import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.Principal; import java.security.cert.X509Certificate; public final class HttpClientBuilder { @@ -87,6 +101,35 @@ public CloseableHttpClient build() { .build(); } + public HttpClient buildKerberosClient() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException { + org.apache.http.impl.client.HttpClientBuilder builder = + org.apache.http.impl.client.HttpClientBuilder.create(); + Lookup authSchemeRegistry = RegistryBuilder.create(). + register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(true)).build(); + builder.setDefaultAuthSchemeRegistry(authSchemeRegistry); + BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(new AuthScope(null, -1, null), new Credentials() { + @Override + public Principal getUserPrincipal() { + return null; + } + @Override + public String getPassword() { + return null; + } + }); + builder.setDefaultCredentialsProvider(credentialsProvider); + SSLContextBuilder sslContextBuilderbuilder = new SSLContextBuilder(); + sslContextBuilderbuilder.loadTrustMaterial(null, (chain, authType) -> true); + SSLConnectionSocketFactory sslsf = new + SSLConnectionSocketFactory( + sslContextBuilderbuilder.build(), NoopHostnameVerifier.INSTANCE); + + builder.setSSLSocketFactory(sslsf); + CloseableHttpClient httpClient = builder.build(); + return httpClient; + } + private SSLConnectionSocketFactory getSSLConnectionFactory() { if (disableSSLCertVerification) { return getUnsecureSSLConnectionSocketFactory(); diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index 43611c23..0d8f2318 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -15,8 +15,10 @@ */ package com.splunk.hecclient; +import com.splunk.kafka.connect.SplunkSinkConnectorConfig; import org.apache.http.Header; import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.protocol.HttpClientContext; @@ -28,8 +30,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.security.auth.Subject; +import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import javax.security.auth.login.LoginContext; import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; +import java.security.Principal; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; final class Indexer implements IndexerInf { private static final Logger log = LoggerFactory.getLogger(Indexer.class); @@ -137,18 +148,62 @@ public boolean send(final EventBatch batch) { @Override public synchronized String executeHttpRequest(final HttpUriRequest req) { CloseableHttpResponse resp; - try { - resp = httpClient.execute(req, context); - } catch (Exception ex) { - logBackPressure(); - log.error("encountered io exception", ex); - throw new HecException("encountered exception when post data", ex); - } - - return readAndCloseResponse(resp); - } + if (!SplunkSinkConnectorConfig.kerberosPrincipal().isEmpty()) { + Configuration config = new Configuration() { + @SuppressWarnings("serial") + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + return new AppConfigurationEntry[] { new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule", + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, new HashMap() { + { + put("useTicketCache", "false"); + put("useKeyTab", "true"); + put("keyTab", SplunkSinkConnectorConfig.kerberosKeytabLocation()); + //Krb5 in GSS API needs to be refreshed so it does not throw the error + //Specified version of key is not available + put("refreshKrb5Config", "true"); + put("principal", SplunkSinkConnectorConfig.kerberosPrincipal()); + put("storeKey", "false"); + put("doNotPrompt", "true"); + put("isInitiator", "true"); + put("debug", "true"); + } + }) }; + } + }; + Set princ = new HashSet(1); + princ.add(new KerberosPrincipal(SplunkSinkConnectorConfig.kerberosUser())); + Subject sub = new Subject(false, princ, new HashSet(), new HashSet()); + try { + LoginContext lc = new LoginContext("", sub, null, config); + lc.login(); + Subject serviceSubject = lc.getSubject(); + return Subject.doAs(serviceSubject, new PrivilegedAction() { + HttpResponse httpResponse = null; + @Override + public HttpResponse run() { + try { + return httpClient.execute(req, context); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + return httpResponse; + } + }).toString(); + } catch (Exception le) { + le.printStackTrace();; + } + return null; + } else { + try { + resp = httpClient.execute(req, context); + } catch (Exception ex) { + logBackPressure(); + log.error("encountered io exception", ex); + throw new HecException("encountered exception when post data", ex); + } - private String readAndCloseResponse(CloseableHttpResponse resp) { + return readAndCloseResponse(resp);DCloseResponse(CloseableHttpResponse resp) { String respPayload; HttpEntity entity = resp.getEntity(); try { diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 0299b075..eacffae7 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -17,6 +17,7 @@ import com.splunk.hecclient.HecConfig; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -36,6 +37,10 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String INDEX_CONF = "splunk.indexes"; static final String SOURCE_CONF = "splunk.sources"; static final String SOURCETYPE_CONF = "splunk.sourcetypes"; + // Kerberos config + static final String KERBEROS_PRINCIPAL = "kerb.principal"; + static final String KERBEROS_USER = "kerb.user"; + static final String KERBEROS_KEYTAB_LOCATION = "kerb.keytab.location"; static final String TOTAL_HEC_CHANNEL_CONF = "splunk.hec.total.channels"; static final String MAX_HTTP_CONNECTION_PER_CHANNEL_CONF = "splunk.hec.max.http.connection.per.channel"; @@ -161,6 +166,11 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String HEADER_SOURCETYPE_DOC = "Header to use for Splunk Header Sourcetype"; static final String HEADER_HOST_DOC = "Header to use for Splunk Header Host"; + static final String KERBEROS_PRINCIPAL_DOC = "Kerberos principal"; + static final String KERBEROS_USER_DOC = "Kerberos user"; + static final String KERBEROS_KEYTAB_LOCATION_DOC = "Kerberos ketab"; + static final String KERBEROS_PASSWORD_DOC = "Kerberos password"; + final String splunkToken; final String splunkURI; final Map> topicMetas; @@ -203,6 +213,11 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final String headerSourcetype; final String headerHost; + static String kerberosPrincipal; + static String kerberosUser; + static Password kerberosPassword; + static String kerberosKeytabLocation; + SplunkSinkConnectorConfig(Map taskConfig) { super(conf(), taskConfig); splunkToken = getPassword(TOKEN_CONF).value(); @@ -239,6 +254,9 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { headerSource = getString(HEADER_SOURCE_CONF); headerSourcetype = getString(HEADER_SOURCETYPE_CONF); headerHost = getString(HEADER_HOST_CONF); + kerberosPrincipal = getString(KERBEROS_PRINCIPAL); + kerberosUser = getString(KERBEROS_USER); + kerberosKeytabLocation = getString(KERBEROS_KEYTAB_LOCATION); } public static ConfigDef conf() { @@ -274,7 +292,10 @@ public static ConfigDef conf() { .define(HEADER_INDEX_CONF, ConfigDef.Type.STRING, "splunk.header.index", ConfigDef.Importance.MEDIUM, HEADER_INDEX_DOC) .define(HEADER_SOURCE_CONF, ConfigDef.Type.STRING, "splunk.header.source", ConfigDef.Importance.MEDIUM, HEADER_SOURCE_DOC) .define(HEADER_SOURCETYPE_CONF, ConfigDef.Type.STRING, "splunk.header.sourcetype", ConfigDef.Importance.MEDIUM, HEADER_SOURCETYPE_DOC) - .define(HEADER_HOST_CONF, ConfigDef.Type.STRING, "splunk.header.host", ConfigDef.Importance.MEDIUM, HEADER_HOST_DOC); + .define(HEADER_HOST_CONF, ConfigDef.Type.STRING, "splunk.header.host", ConfigDef.Importance.MEDIUM, HEADER_HOST_DOC) + .define(KERBEROS_PRINCIPAL, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_PRINCIPAL_DOC) + .define(KERBEROS_USER, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_USER_DOC) + .define(KERBEROS_KEYTAB_LOCATION, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_KEYTAB_LOCATION_DOC); } /** Configuration Method to setup all settings related to Splunk HEC Client @@ -405,4 +426,16 @@ private Map> initMetaMap(Map taskCon } return metaMap; } + + public static String kerberosPrincipal() { + return kerberosPrincipal; + } + + public static String kerberosUser() { + return kerberosUser; + } + + public static String kerberosKeytabLocation() { + return kerberosKeytabLocation; + } } From 59e920d51805e23bc9c14d8da801bf8be3c0d50b Mon Sep 17 00:00:00 2001 From: deepesh1234 Date: Mon, 7 Sep 2020 14:57:17 +0530 Subject: [PATCH 02/11] HCD-387 Improved codebase | removed unwanted variables. --- src/main/java/com/splunk/hecclient/Indexer.java | 6 +++++- .../com/splunk/kafka/connect/SplunkSinkConnectorConfig.java | 2 -- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index 0d8f2318..b60c6b02 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -203,7 +203,11 @@ public HttpResponse run() { throw new HecException("encountered exception when post data", ex); } - return readAndCloseResponse(resp);DCloseResponse(CloseableHttpResponse resp) { + return readAndCloseResponse(resp); + } + } + + private String readAndCloseResponse(CloseableHttpResponse resp) { String respPayload; HttpEntity entity = resp.getEntity(); try { diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index eacffae7..7b63e5cd 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -169,7 +169,6 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String KERBEROS_PRINCIPAL_DOC = "Kerberos principal"; static final String KERBEROS_USER_DOC = "Kerberos user"; static final String KERBEROS_KEYTAB_LOCATION_DOC = "Kerberos ketab"; - static final String KERBEROS_PASSWORD_DOC = "Kerberos password"; final String splunkToken; final String splunkURI; @@ -215,7 +214,6 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static String kerberosPrincipal; static String kerberosUser; - static Password kerberosPassword; static String kerberosKeytabLocation; SplunkSinkConnectorConfig(Map taskConfig) { From baf39ad07b893d53f6bcc99b76c0acba144f6f58 Mon Sep 17 00:00:00 2001 From: deepesh1234 Date: Mon, 7 Sep 2020 15:58:28 +0530 Subject: [PATCH 03/11] HCD-387 Improved codebase. --- src/main/java/com/splunk/hecclient/Hec.java | 9 +++--- .../java/com/splunk/hecclient/HecConfig.java | 30 +++++++++++++++++++ .../java/com/splunk/hecclient/Indexer.java | 18 +++++++---- .../connect/SplunkSinkConnectorConfig.java | 24 +++++---------- 4 files changed, 53 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/splunk/hecclient/Hec.java b/src/main/java/com/splunk/hecclient/Hec.java index e620191b..11704906 100644 --- a/src/main/java/com/splunk/hecclient/Hec.java +++ b/src/main/java/com/splunk/hecclient/Hec.java @@ -197,7 +197,7 @@ public static HecAckPoller createPoller(HecConfig config, PollerCallback callbac public Hec(HecConfig config, CloseableHttpClient httpClient, Poller poller, LoadBalancerInf loadBalancer) { for (int i = 0; i < config.getTotalChannels(); ) { for (String uri : config.getUris()) { - Indexer indexer = new Indexer(uri, config.getToken(), httpClient, poller); + Indexer indexer = new Indexer(uri, config.getToken(), httpClient, poller, config); indexer.setKeepAlive(config.getHttpKeepAlive()); loadBalancer.add(indexer.getChannel().setTracking(config.getEnableChannelTracking())); i++; @@ -264,10 +264,9 @@ public final void close() { */ public static CloseableHttpClient createHttpClient(final HecConfig config) { int poolSizePerDest = config.getMaxHttpConnectionPerChannel(); - - if (!SplunkSinkConnectorConfig.kerberosPrincipal().isEmpty() - && !SplunkSinkConnectorConfig.kerberosKeytabLocation().isEmpty() - && !SplunkSinkConnectorConfig.kerberosUser().isEmpty() + if (!config.kerberosPrincipal().isEmpty() + && !config.kerberosKeytabLocation().isEmpty() + && !config.kerberosUser().isEmpty() ) { try { return (CloseableHttpClient) new HttpClientBuilder().buildKerberosClient(); diff --git a/src/main/java/com/splunk/hecclient/HecConfig.java b/src/main/java/com/splunk/hecclient/HecConfig.java index 36cf1ea5..23bc04d5 100644 --- a/src/main/java/com/splunk/hecclient/HecConfig.java +++ b/src/main/java/com/splunk/hecclient/HecConfig.java @@ -34,6 +34,9 @@ public final class HecConfig { private boolean hasCustomTrustStore = false; private String trustStorePath; private String trustStorePassword; + private String kerberosPrincipal; + private String kerberosUser; + private String kerberosKeytabLocation; public HecConfig(List uris, String token) { this.uris = uris; @@ -155,6 +158,33 @@ public HecConfig setHasCustomTrustStore(boolean hasStore) { return this; } + public String kerberosPrincipal() { + return kerberosPrincipal; + } + + public HecConfig setKerberosPrincipal(String kerberosPrincipal) { + this.kerberosPrincipal = kerberosPrincipal; + return this; + } + + public String kerberosUser() { + return kerberosUser; + } + + public HecConfig setKerberosUser(String kerberosUser) { + this.kerberosUser = kerberosUser; + return this; + } + + public String kerberosKeytabLocation() { + return kerberosKeytabLocation; + } + + public HecConfig setKerberosKeytabLocation(String kerberosKeytabLocation) { + this.kerberosKeytabLocation = kerberosKeytabLocation; + return this; + } + public HecConfig setEnableChannelTracking(boolean trackChannel) { enableChannelTracking = trackChannel; return this; diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index b60c6b02..80ff1e2a 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -45,6 +45,7 @@ final class Indexer implements IndexerInf { private static final Logger log = LoggerFactory.getLogger(Indexer.class); + private HecConfig hecConfig; private CloseableHttpClient httpClient; private HttpContext context; private String baseUrl; @@ -58,14 +59,19 @@ final class Indexer implements IndexerInf { private long backPressureThreshhold = 60 * 1000; // 1 min // Indexer doesn't own client, ack poller - public Indexer(String baseUrl, String hecToken, CloseableHttpClient client, Poller poller) { + public Indexer( + String baseUrl, + String hecToken, + CloseableHttpClient client, + Poller poller, + HecConfig config) { this.httpClient = client; this.baseUrl = baseUrl; this.hecToken = hecToken; this.poller = poller; this.context = HttpClientContext.create(); backPressure = 0; - + this.hecConfig = config; channel = new HecChannel(this); // Init headers @@ -148,7 +154,7 @@ public boolean send(final EventBatch batch) { @Override public synchronized String executeHttpRequest(final HttpUriRequest req) { CloseableHttpResponse resp; - if (!SplunkSinkConnectorConfig.kerberosPrincipal().isEmpty()) { + if (!hecConfig.kerberosPrincipal().isEmpty()) { Configuration config = new Configuration() { @SuppressWarnings("serial") @Override @@ -158,11 +164,11 @@ public AppConfigurationEntry[] getAppConfigurationEntry(String name) { { put("useTicketCache", "false"); put("useKeyTab", "true"); - put("keyTab", SplunkSinkConnectorConfig.kerberosKeytabLocation()); + put("keyTab", hecConfig.kerberosKeytabLocation()); //Krb5 in GSS API needs to be refreshed so it does not throw the error //Specified version of key is not available put("refreshKrb5Config", "true"); - put("principal", SplunkSinkConnectorConfig.kerberosPrincipal()); + put("principal", hecConfig.kerberosPrincipal()); put("storeKey", "false"); put("doNotPrompt", "true"); put("isInitiator", "true"); @@ -172,7 +178,7 @@ public AppConfigurationEntry[] getAppConfigurationEntry(String name) { } }; Set princ = new HashSet(1); - princ.add(new KerberosPrincipal(SplunkSinkConnectorConfig.kerberosUser())); + princ.add(new KerberosPrincipal(hecConfig.kerberosUser())); Subject sub = new Subject(false, princ, new HashSet(), new HashSet()); try { LoginContext lc = new LoginContext("", sub, null, config); diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 7b63e5cd..6482723a 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -17,7 +17,6 @@ import com.splunk.hecclient.HecConfig; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.config.types.Password; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -212,9 +211,9 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final String headerSourcetype; final String headerHost; - static String kerberosPrincipal; - static String kerberosUser; - static String kerberosKeytabLocation; + final String kerberosPrincipal; + final String kerberosUser; + final String kerberosKeytabLocation; SplunkSinkConnectorConfig(Map taskConfig) { super(conf(), taskConfig); @@ -311,7 +310,10 @@ public HecConfig getHecConfig() { .setEnableChannelTracking(trackData) .setTrustStorePath(trustStorePath) .setTrustStorePassword(trustStorePassword) - .setHasCustomTrustStore(hasTrustStorePath); + .setHasCustomTrustStore(hasTrustStorePath) + .setKerberosPrincipal(kerberosPrincipal) + .setKerberosUser(kerberosUser) + .setKerberosKeytabLocation(kerberosKeytabLocation); return config; } @@ -424,16 +426,4 @@ private Map> initMetaMap(Map taskCon } return metaMap; } - - public static String kerberosPrincipal() { - return kerberosPrincipal; - } - - public static String kerberosUser() { - return kerberosUser; - } - - public static String kerberosKeytabLocation() { - return kerberosKeytabLocation; - } } From 1a6e53d64ae44294ca6ff003b79c09b748dce8a4 Mon Sep 17 00:00:00 2001 From: deepesh1234 Date: Mon, 7 Sep 2020 15:59:50 +0530 Subject: [PATCH 04/11] HCD-387 Improved testcases. --- .../java/com/splunk/hecclient/IndexerTest.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/test/java/com/splunk/hecclient/IndexerTest.java b/src/test/java/com/splunk/hecclient/IndexerTest.java index f9ef3f72..274fcc0e 100644 --- a/src/test/java/com/splunk/hecclient/IndexerTest.java +++ b/src/test/java/com/splunk/hecclient/IndexerTest.java @@ -15,18 +15,23 @@ */ package com.splunk.hecclient; +import com.splunk.kafka.connect.SplunkSinkConnectorConfig; import org.apache.http.Header; import org.apache.http.impl.client.CloseableHttpClient; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import java.util.Collections; + public class IndexerTest { private static final String baseUrl = "https://localhost:8088"; private static final String token = "mytoken"; @Test public void getHeaders() { - Indexer indexer = new Indexer(baseUrl, token, null, null); + Indexer indexer = new Indexer(baseUrl, token, null, null, + new HecConfig(Collections.emptyList(), "")); Header[] headers = indexer.getHeaders(); Assert.assertEquals(3, headers.length); @@ -63,7 +68,8 @@ public void getHeaders() { @Test public void getterSetter() { - Indexer indexer = new Indexer(baseUrl, token, null, null); + Indexer indexer = new Indexer(baseUrl, token, null, null, + new HecConfig(Collections.emptyList(), "")); Assert.assertEquals(baseUrl, indexer.getBaseUrl()); Assert.assertEquals(token, indexer.getToken()); @@ -74,7 +80,8 @@ public void getterSetter() { @Test public void toStr() { - Indexer indexer = new Indexer(baseUrl, token, null, null); + Indexer indexer = new Indexer(baseUrl, token, null, null, + new HecConfig(Collections.emptyList(), "")); Assert.assertEquals(baseUrl, indexer.toString()); } @@ -87,7 +94,7 @@ public void sendWithSuccess() { } PollerMock poller = new PollerMock(); - Indexer indexer = new Indexer(baseUrl, token, client, poller); + Indexer indexer = new Indexer(baseUrl, token, client, poller, new HecConfig(Collections.emptyList(), "")); EventBatch batch = UnitUtil.createBatch(); boolean result = indexer.send(batch); Assert.assertTrue(result); @@ -139,7 +146,7 @@ public void sendWithReadError() { private Indexer assertFailure(CloseableHttpClient client) { PollerMock poller = new PollerMock(); - Indexer indexer = new Indexer(baseUrl, token, client, poller); + Indexer indexer = new Indexer(baseUrl, token, client, poller, new HecConfig(Collections.emptyList(), "")); EventBatch batch = UnitUtil.createBatch(); boolean result = indexer.send(batch); Assert.assertFalse(result); From 41c7e4a2a47a624a6545375a46ec412e4545fd8a Mon Sep 17 00:00:00 2001 From: Sumanthonnavar Date: Mon, 7 Sep 2020 16:09:25 +0530 Subject: [PATCH 05/11] fixed unit tests --- .../com/splunk/hecclient/IndexerTest.java | 22 +++++++++---------- .../java/com/splunk/hecclient/UnitUtil.java | 3 ++- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/test/java/com/splunk/hecclient/IndexerTest.java b/src/test/java/com/splunk/hecclient/IndexerTest.java index 274fcc0e..943d5cce 100644 --- a/src/test/java/com/splunk/hecclient/IndexerTest.java +++ b/src/test/java/com/splunk/hecclient/IndexerTest.java @@ -15,23 +15,23 @@ */ package com.splunk.hecclient; -import com.splunk.kafka.connect.SplunkSinkConnectorConfig; +import java.util.Collections; + import org.apache.http.Header; import org.apache.http.impl.client.CloseableHttpClient; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; -import java.util.Collections; - public class IndexerTest { private static final String baseUrl = "https://localhost:8088"; private static final String token = "mytoken"; + private static final HecConfig hecConfig = + new HecConfig(Collections.emptyList(), "") + .setKerberosPrincipal(""); @Test public void getHeaders() { - Indexer indexer = new Indexer(baseUrl, token, null, null, - new HecConfig(Collections.emptyList(), "")); + Indexer indexer = new Indexer(baseUrl, token, null, null, hecConfig); Header[] headers = indexer.getHeaders(); Assert.assertEquals(3, headers.length); @@ -68,8 +68,7 @@ public void getHeaders() { @Test public void getterSetter() { - Indexer indexer = new Indexer(baseUrl, token, null, null, - new HecConfig(Collections.emptyList(), "")); + Indexer indexer = new Indexer(baseUrl, token, null, null,hecConfig); Assert.assertEquals(baseUrl, indexer.getBaseUrl()); Assert.assertEquals(token, indexer.getToken()); @@ -80,8 +79,7 @@ public void getterSetter() { @Test public void toStr() { - Indexer indexer = new Indexer(baseUrl, token, null, null, - new HecConfig(Collections.emptyList(), "")); + Indexer indexer = new Indexer(baseUrl, token, null, null, hecConfig); Assert.assertEquals(baseUrl, indexer.toString()); } @@ -94,7 +92,7 @@ public void sendWithSuccess() { } PollerMock poller = new PollerMock(); - Indexer indexer = new Indexer(baseUrl, token, client, poller, new HecConfig(Collections.emptyList(), "")); + Indexer indexer = new Indexer(baseUrl, token, client, poller, hecConfig); EventBatch batch = UnitUtil.createBatch(); boolean result = indexer.send(batch); Assert.assertTrue(result); @@ -146,7 +144,7 @@ public void sendWithReadError() { private Indexer assertFailure(CloseableHttpClient client) { PollerMock poller = new PollerMock(); - Indexer indexer = new Indexer(baseUrl, token, client, poller, new HecConfig(Collections.emptyList(), "")); + Indexer indexer = new Indexer(baseUrl, token, client, poller, hecConfig); EventBatch batch = UnitUtil.createBatch(); boolean result = indexer.send(batch); Assert.assertFalse(result); diff --git a/src/test/java/com/splunk/hecclient/UnitUtil.java b/src/test/java/com/splunk/hecclient/UnitUtil.java index eff9181f..287c003f 100644 --- a/src/test/java/com/splunk/hecclient/UnitUtil.java +++ b/src/test/java/com/splunk/hecclient/UnitUtil.java @@ -24,7 +24,8 @@ public class UnitUtil { public static HecConfig createHecConfig() { - return new HecConfig(Arrays.asList("https://dummyhost:8088"), "token"); + return new HecConfig(Arrays.asList("https://dummyhost:8088"), "token") + .setKerberosPrincipal(""); } public static EventBatch createBatch() { From e33814aec89f55f7058d249cbe3cae1295340fa0 Mon Sep 17 00:00:00 2001 From: deepesh1234 Date: Tue, 8 Sep 2020 12:35:07 +0530 Subject: [PATCH 06/11] HCD-387 Resolved few review comments. --- src/main/java/com/splunk/hecclient/Hec.java | 15 ++--- .../java/com/splunk/hecclient/HecConfig.java | 5 ++ .../java/com/splunk/hecclient/Indexer.java | 60 +++++++++---------- .../connect/SplunkSinkConnectorConfig.java | 2 +- 4 files changed, 40 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/splunk/hecclient/Hec.java b/src/main/java/com/splunk/hecclient/Hec.java index 11704906..ad83c094 100644 --- a/src/main/java/com/splunk/hecclient/Hec.java +++ b/src/main/java/com/splunk/hecclient/Hec.java @@ -15,7 +15,6 @@ */ package com.splunk.hecclient; -import com.splunk.kafka.connect.SplunkSinkConnectorConfig; import org.apache.http.impl.client.CloseableHttpClient; import java.io.FileInputStream; @@ -29,6 +28,7 @@ import java.security.KeyManagementException; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.connect.errors.ConnectException; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; @@ -264,18 +264,11 @@ public final void close() { */ public static CloseableHttpClient createHttpClient(final HecConfig config) { int poolSizePerDest = config.getMaxHttpConnectionPerChannel(); - if (!config.kerberosPrincipal().isEmpty() - && !config.kerberosKeytabLocation().isEmpty() - && !config.kerberosUser().isEmpty() - ) { + if (config.kerberosAuthEnabled()) { try { return (CloseableHttpClient) new HttpClientBuilder().buildKerberosClient(); - } catch (KeyStoreException e) { - e.printStackTrace(); - } catch (NoSuchAlgorithmException e) { - e.printStackTrace(); - } catch (KeyManagementException e) { - e.printStackTrace(); + } catch (KeyStoreException | NoSuchAlgorithmException | KeyManagementException ex) { + throw new ConnectException("Unable to build Kerberos Client", ex); } } // Code block for default client construction diff --git a/src/main/java/com/splunk/hecclient/HecConfig.java b/src/main/java/com/splunk/hecclient/HecConfig.java index 23bc04d5..70659d17 100644 --- a/src/main/java/com/splunk/hecclient/HecConfig.java +++ b/src/main/java/com/splunk/hecclient/HecConfig.java @@ -189,4 +189,9 @@ public HecConfig setEnableChannelTracking(boolean trackChannel) { enableChannelTracking = trackChannel; return this; } + + public boolean kerberosAuthEnabled() { + return !kerberosPrincipal().isEmpty() && !kerberosKeytabLocation().isEmpty() + && !kerberosUser().isEmpty(); + } } diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index 80ff1e2a..f80b9f40 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -27,6 +27,7 @@ import org.apache.http.message.BasicHeader; import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; +import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +47,7 @@ final class Indexer implements IndexerInf { private static final Logger log = LoggerFactory.getLogger(Indexer.class); private HecConfig hecConfig; + private Configuration config; private CloseableHttpClient httpClient; private HttpContext context; private String baseUrl; @@ -154,29 +156,31 @@ public boolean send(final EventBatch batch) { @Override public synchronized String executeHttpRequest(final HttpUriRequest req) { CloseableHttpResponse resp; - if (!hecConfig.kerberosPrincipal().isEmpty()) { - Configuration config = new Configuration() { - @SuppressWarnings("serial") - @Override - public AppConfigurationEntry[] getAppConfigurationEntry(String name) { - return new AppConfigurationEntry[] { new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule", - AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, new HashMap() { - { - put("useTicketCache", "false"); - put("useKeyTab", "true"); - put("keyTab", hecConfig.kerberosKeytabLocation()); - //Krb5 in GSS API needs to be refreshed so it does not throw the error - //Specified version of key is not available - put("refreshKrb5Config", "true"); - put("principal", hecConfig.kerberosPrincipal()); - put("storeKey", "false"); - put("doNotPrompt", "true"); - put("isInitiator", "true"); - put("debug", "true"); - } - }) }; - } - }; + if (hecConfig.kerberosAuthEnabled()) { + if (config == null) { + config = new Configuration() { + @SuppressWarnings("serial") + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + return new AppConfigurationEntry[]{new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule", + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, new HashMap() { + { + put("useTicketCache", "false"); + put("useKeyTab", "true"); + put("keyTab", hecConfig.kerberosKeytabLocation()); + //Krb5 in GSS API needs to be refreshed so it does not throw the error + //Specified version of key is not available + put("refreshKrb5Config", "true"); + put("principal", hecConfig.kerberosPrincipal()); + put("storeKey", "false"); + put("doNotPrompt", "true"); + put("isInitiator", "true"); + put("debug", "true"); + } + })}; + } + }; + } Set princ = new HashSet(1); princ.add(new KerberosPrincipal(hecConfig.kerberosUser())); Subject sub = new Subject(false, princ, new HashSet(), new HashSet()); @@ -185,27 +189,23 @@ public AppConfigurationEntry[] getAppConfigurationEntry(String name) { lc.login(); Subject serviceSubject = lc.getSubject(); return Subject.doAs(serviceSubject, new PrivilegedAction() { - HttpResponse httpResponse = null; @Override public HttpResponse run() { try { return httpClient.execute(req, context); - } catch (IOException ioe) { - ioe.printStackTrace(); + } catch (IOException ex) { + throw new ConnectException("Unable to execute HttpClient request", ex); } - return httpResponse; } }).toString(); } catch (Exception le) { - le.printStackTrace();; + throw new ConnectException("Unable to authenticate with kerberos server", le); } - return null; } else { try { resp = httpClient.execute(req, context); } catch (Exception ex) { logBackPressure(); - log.error("encountered io exception", ex); throw new HecException("encountered exception when post data", ex); } diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 6482723a..ca906dd7 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -167,7 +167,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String KERBEROS_PRINCIPAL_DOC = "Kerberos principal"; static final String KERBEROS_USER_DOC = "Kerberos user"; - static final String KERBEROS_KEYTAB_LOCATION_DOC = "Kerberos ketab"; + static final String KERBEROS_KEYTAB_LOCATION_DOC = "Kerberos keytab"; final String splunkToken; final String splunkURI; From e84a5735f6a7731e7b9aee8814cf979fb02b7890 Mon Sep 17 00:00:00 2001 From: deepesh1234 Date: Tue, 8 Sep 2020 14:05:35 +0530 Subject: [PATCH 07/11] HCD-387 Added multi config validation. --- .../kafka/connect/SplunkSinkConnector.java | 9 +++++ .../connect/SplunkSinkConnectorConfig.java | 36 +++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index 70865868..a5754eb0 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -66,4 +66,13 @@ public String version() { public ConfigDef config() { return SplunkSinkConnectorConfig.conf(); } + + @Override + public Config validate(Map connectorConfigs) { + return new ConfigValidation( + config(), + connectorConfigs, + SplunkSinkConnectorConfig::validateKerberosConfigs + ).validate(); + } } diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index ca906dd7..79718e6f 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -426,4 +426,40 @@ private Map> initMetaMap(Map taskCon } return metaMap; } + + protected static void validateKerberosConfigs(Map configs, ConfigValidationResult result) { + String kerberosKeytabLocation = + (String) configs.get(KERBEROS_KEYTAB_LOCATION); + String kerberosUser = + (String) configs.get(KERBEROS_USER); + String kerberosPrincipal = (String) configs.get(KERBEROS_PRINCIPAL); + + if (Strings.isNotEmpty(kerberosKeytabLocation) + && Strings.isNotEmpty(kerberosPrincipal) + && Strings.isNotEmpty(kerberosUser) + ) { + return; + } + + if (kerberosKeytabLocation.isEmpty() + && kerberosPrincipal.isEmpty() + && kerberosUser.isEmpty() + ) { + return; + } + + // If any of the SSL configuration is not + // empty then it must have set all other SSL configs + result.recordErrors( + String.format( + "%s, %s and %s are required to be configured for Kerberos authentication. ", + KERBEROS_USER, + KERBEROS_PRINCIPAL, + KERBEROS_KEYTAB_LOCATION + ), + KERBEROS_USER, + KERBEROS_PRINCIPAL, + KERBEROS_KEYTAB_LOCATION + ); + } } From dea44a42cd80f1771234934cd76ff1c682c263ab Mon Sep 17 00:00:00 2001 From: Sumanthonnavar Date: Tue, 8 Sep 2020 14:16:14 +0530 Subject: [PATCH 08/11] added connect-utils --- pom.xml | 5 +++++ .../java/com/splunk/kafka/connect/SplunkSinkConnector.java | 4 ++++ .../com/splunk/kafka/connect/SplunkSinkConnectorConfig.java | 6 ++++-- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index d8ea2ce0..c4feb7dc 100644 --- a/pom.xml +++ b/pom.xml @@ -140,6 +140,11 @@ 3.7 compile + + io.confluent + connect-utils + 0.3.1 + diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index a5754eb0..671dcca9 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -16,6 +16,10 @@ package com.splunk.kafka.connect; import com.splunk.kafka.connect.VersionUtils; + +import io.confluent.connect.utils.validators.all.ConfigValidation; + +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 79718e6f..e442f562 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -16,6 +16,10 @@ package com.splunk.kafka.connect; import com.splunk.hecclient.HecConfig; + +import io.confluent.connect.utils.Strings; +import io.confluent.connect.utils.validators.all.ConfigValidationResult; + import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.common.config.AbstractConfig; @@ -448,8 +452,6 @@ protected static void validateKerberosConfigs(Map configs, Confi return; } - // If any of the SSL configuration is not - // empty then it must have set all other SSL configs result.recordErrors( String.format( "%s, %s and %s are required to be configured for Kerberos authentication. ", From 5c6ed1d8d1193de771f57c741798f43a7908a51d Mon Sep 17 00:00:00 2001 From: deepesh1234 Date: Tue, 8 Sep 2020 17:00:30 +0530 Subject: [PATCH 09/11] HCD-387 Resolved few review comments. --- src/main/java/com/splunk/hecclient/HecConfig.java | 3 +-- src/main/java/com/splunk/hecclient/Indexer.java | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/splunk/hecclient/HecConfig.java b/src/main/java/com/splunk/hecclient/HecConfig.java index 70659d17..b9997e3d 100644 --- a/src/main/java/com/splunk/hecclient/HecConfig.java +++ b/src/main/java/com/splunk/hecclient/HecConfig.java @@ -191,7 +191,6 @@ public HecConfig setEnableChannelTracking(boolean trackChannel) { } public boolean kerberosAuthEnabled() { - return !kerberosPrincipal().isEmpty() && !kerberosKeytabLocation().isEmpty() - && !kerberosUser().isEmpty(); + return !kerberosPrincipal().isEmpty(); } } diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index f80b9f40..c79adfbf 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -194,12 +194,13 @@ public HttpResponse run() { try { return httpClient.execute(req, context); } catch (IOException ex) { - throw new ConnectException("Unable to execute HttpClient request", ex); + logBackPressure(); + throw new HecException("encountered exception when post data", ex); } } }).toString(); } catch (Exception le) { - throw new ConnectException("Unable to authenticate with kerberos server", le); + throw new HecException("encountered exception when authenticating to kerberos", le); } } else { try { @@ -208,9 +209,8 @@ public HttpResponse run() { logBackPressure(); throw new HecException("encountered exception when post data", ex); } - - return readAndCloseResponse(resp); } + return readAndCloseResponse(resp); } private String readAndCloseResponse(CloseableHttpResponse resp) { From 410e4fe867ecd851a825f3e640c792f0877087fb Mon Sep 17 00:00:00 2001 From: Sumanthonnavar Date: Tue, 8 Sep 2020 18:13:26 +0530 Subject: [PATCH 10/11] improvements --- src/main/java/com/splunk/hecclient/Indexer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index c79adfbf..f3615bc1 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -188,9 +188,9 @@ public AppConfigurationEntry[] getAppConfigurationEntry(String name) { LoginContext lc = new LoginContext("", sub, null, config); lc.login(); Subject serviceSubject = lc.getSubject(); - return Subject.doAs(serviceSubject, new PrivilegedAction() { + resp = Subject.doAs(serviceSubject, new PrivilegedAction() { @Override - public HttpResponse run() { + public CloseableHttpResponse run() { try { return httpClient.execute(req, context); } catch (IOException ex) { @@ -198,7 +198,7 @@ public HttpResponse run() { throw new HecException("encountered exception when post data", ex); } } - }).toString(); + }); } catch (Exception le) { throw new HecException("encountered exception when authenticating to kerberos", le); } From 13c91e7583ae17d37b2b779da1eebe2dd4b9ac5d Mon Sep 17 00:00:00 2001 From: deepesh1234 Date: Thu, 10 Sep 2020 01:20:36 +0530 Subject: [PATCH 11/11] HCD-387 Resolved few review comments. --- src/main/java/com/splunk/hecclient/Hec.java | 4 +- .../splunk/hecclient/HttpClientBuilder.java | 3 +- .../java/com/splunk/hecclient/Indexer.java | 56 ++++++++++--------- .../connect/SplunkSinkConnectorConfig.java | 36 ++++++------ .../com/splunk/hecclient/IndexerTest.java | 10 ++-- 5 files changed, 56 insertions(+), 53 deletions(-) diff --git a/src/main/java/com/splunk/hecclient/Hec.java b/src/main/java/com/splunk/hecclient/Hec.java index ad83c094..e145ff51 100644 --- a/src/main/java/com/splunk/hecclient/Hec.java +++ b/src/main/java/com/splunk/hecclient/Hec.java @@ -197,7 +197,7 @@ public static HecAckPoller createPoller(HecConfig config, PollerCallback callbac public Hec(HecConfig config, CloseableHttpClient httpClient, Poller poller, LoadBalancerInf loadBalancer) { for (int i = 0; i < config.getTotalChannels(); ) { for (String uri : config.getUris()) { - Indexer indexer = new Indexer(uri, config.getToken(), httpClient, poller, config); + Indexer indexer = new Indexer(uri, httpClient, poller, config); indexer.setKeepAlive(config.getHttpKeepAlive()); loadBalancer.add(indexer.getChannel().setTracking(config.getEnableChannelTracking())); i++; @@ -266,7 +266,7 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) { int poolSizePerDest = config.getMaxHttpConnectionPerChannel(); if (config.kerberosAuthEnabled()) { try { - return (CloseableHttpClient) new HttpClientBuilder().buildKerberosClient(); + return new HttpClientBuilder().buildKerberosClient(); } catch (KeyStoreException | NoSuchAlgorithmException | KeyManagementException ex) { throw new ConnectException("Unable to build Kerberos Client", ex); } diff --git a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java index 1bc5a562..7fe2d9a1 100644 --- a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java +++ b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java @@ -18,7 +18,6 @@ import org.apache.http.auth.AuthSchemeProvider; import org.apache.http.auth.AuthScope; import org.apache.http.auth.Credentials; -import org.apache.http.client.HttpClient; import org.apache.http.client.config.AuthSchemes; import org.apache.http.client.config.CookieSpecs; import org.apache.http.client.config.RequestConfig; @@ -101,7 +100,7 @@ public CloseableHttpClient build() { .build(); } - public HttpClient buildKerberosClient() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException { + public CloseableHttpClient buildKerberosClient() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException { org.apache.http.impl.client.HttpClientBuilder builder = org.apache.http.impl.client.HttpClientBuilder.create(); Lookup authSchemeRegistry = RegistryBuilder.create(). diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index f3615bc1..d4f481a5 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -63,13 +63,12 @@ final class Indexer implements IndexerInf { // Indexer doesn't own client, ack poller public Indexer( String baseUrl, - String hecToken, CloseableHttpClient client, Poller poller, HecConfig config) { this.httpClient = client; this.baseUrl = baseUrl; - this.hecToken = hecToken; + this.hecToken = config.getToken(); this.poller = poller; this.context = HttpClientContext.create(); backPressure = 0; @@ -158,28 +157,7 @@ public synchronized String executeHttpRequest(final HttpUriRequest req) { CloseableHttpResponse resp; if (hecConfig.kerberosAuthEnabled()) { if (config == null) { - config = new Configuration() { - @SuppressWarnings("serial") - @Override - public AppConfigurationEntry[] getAppConfigurationEntry(String name) { - return new AppConfigurationEntry[]{new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule", - AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, new HashMap() { - { - put("useTicketCache", "false"); - put("useKeyTab", "true"); - put("keyTab", hecConfig.kerberosKeytabLocation()); - //Krb5 in GSS API needs to be refreshed so it does not throw the error - //Specified version of key is not available - put("refreshKrb5Config", "true"); - put("principal", hecConfig.kerberosPrincipal()); - put("storeKey", "false"); - put("doNotPrompt", "true"); - put("isInitiator", "true"); - put("debug", "true"); - } - })}; - } - }; + defineKerberosConfigs(); } Set princ = new HashSet(1); princ.add(new KerberosPrincipal(hecConfig.kerberosUser())); @@ -195,12 +173,13 @@ public CloseableHttpResponse run() { return httpClient.execute(req, context); } catch (IOException ex) { logBackPressure(); - throw new HecException("encountered exception when post data", ex); + throw new HecException("Encountered exception while posting data.", ex); } } }); } catch (Exception le) { - throw new HecException("encountered exception when authenticating to kerberos", le); + throw new HecException( + "Encountered exception while authenticating via Kerberos.", le); } } else { try { @@ -213,6 +192,31 @@ public CloseableHttpResponse run() { return readAndCloseResponse(resp); } + private void defineKerberosConfigs() { + config = new Configuration() { + @SuppressWarnings("serial") + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + return new AppConfigurationEntry[]{new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule", + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, new HashMap() { + { + put("useTicketCache", "false"); + put("useKeyTab", "true"); + put("keyTab", hecConfig.kerberosKeytabLocation()); + //Krb5 in GSS API needs to be refreshed so it does not throw the error + //Specified version of key is not available + put("refreshKrb5Config", "true"); + put("principal", hecConfig.kerberosPrincipal()); + put("storeKey", "false"); + put("doNotPrompt", "true"); + put("isInitiator", "true"); + put("debug", "true"); + } + })}; + } + }; + } + private String readAndCloseResponse(CloseableHttpResponse resp) { String respPayload; HttpEntity entity = resp.getEntity(); diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index e442f562..ad053805 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -41,9 +41,9 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String SOURCE_CONF = "splunk.sources"; static final String SOURCETYPE_CONF = "splunk.sourcetypes"; // Kerberos config - static final String KERBEROS_PRINCIPAL = "kerb.principal"; - static final String KERBEROS_USER = "kerb.user"; - static final String KERBEROS_KEYTAB_LOCATION = "kerb.keytab.location"; + static final String KERBEROS_PRINCIPAL_CONF = "kerb.principal"; + static final String KERBEROS_USER_CONF = "kerb.user"; + static final String KERBEROS_KEYTAB_LOCATION_CONF = "kerb.keytab.location"; static final String TOTAL_HEC_CHANNEL_CONF = "splunk.hec.total.channels"; static final String MAX_HTTP_CONNECTION_PER_CHANNEL_CONF = "splunk.hec.max.http.connection.per.channel"; @@ -255,9 +255,9 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { headerSource = getString(HEADER_SOURCE_CONF); headerSourcetype = getString(HEADER_SOURCETYPE_CONF); headerHost = getString(HEADER_HOST_CONF); - kerberosPrincipal = getString(KERBEROS_PRINCIPAL); - kerberosUser = getString(KERBEROS_USER); - kerberosKeytabLocation = getString(KERBEROS_KEYTAB_LOCATION); + kerberosPrincipal = getString(KERBEROS_PRINCIPAL_CONF); + kerberosUser = getString(KERBEROS_USER_CONF); + kerberosKeytabLocation = getString(KERBEROS_KEYTAB_LOCATION_CONF); } public static ConfigDef conf() { @@ -294,9 +294,9 @@ public static ConfigDef conf() { .define(HEADER_SOURCE_CONF, ConfigDef.Type.STRING, "splunk.header.source", ConfigDef.Importance.MEDIUM, HEADER_SOURCE_DOC) .define(HEADER_SOURCETYPE_CONF, ConfigDef.Type.STRING, "splunk.header.sourcetype", ConfigDef.Importance.MEDIUM, HEADER_SOURCETYPE_DOC) .define(HEADER_HOST_CONF, ConfigDef.Type.STRING, "splunk.header.host", ConfigDef.Importance.MEDIUM, HEADER_HOST_DOC) - .define(KERBEROS_PRINCIPAL, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_PRINCIPAL_DOC) - .define(KERBEROS_USER, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_USER_DOC) - .define(KERBEROS_KEYTAB_LOCATION, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_KEYTAB_LOCATION_DOC); + .define(KERBEROS_PRINCIPAL_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_PRINCIPAL_DOC) + .define(KERBEROS_USER_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_USER_DOC) + .define(KERBEROS_KEYTAB_LOCATION_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_KEYTAB_LOCATION_DOC); } /** Configuration Method to setup all settings related to Splunk HEC Client @@ -433,10 +433,10 @@ private Map> initMetaMap(Map taskCon protected static void validateKerberosConfigs(Map configs, ConfigValidationResult result) { String kerberosKeytabLocation = - (String) configs.get(KERBEROS_KEYTAB_LOCATION); + (String) configs.get(KERBEROS_KEYTAB_LOCATION_CONF); String kerberosUser = - (String) configs.get(KERBEROS_USER); - String kerberosPrincipal = (String) configs.get(KERBEROS_PRINCIPAL); + (String) configs.get(KERBEROS_USER_CONF); + String kerberosPrincipal = (String) configs.get(KERBEROS_PRINCIPAL_CONF); if (Strings.isNotEmpty(kerberosKeytabLocation) && Strings.isNotEmpty(kerberosPrincipal) @@ -455,13 +455,13 @@ protected static void validateKerberosConfigs(Map configs, Confi result.recordErrors( String.format( "%s, %s and %s are required to be configured for Kerberos authentication. ", - KERBEROS_USER, - KERBEROS_PRINCIPAL, - KERBEROS_KEYTAB_LOCATION + KERBEROS_USER_CONF, + KERBEROS_PRINCIPAL_CONF, + KERBEROS_KEYTAB_LOCATION_CONF ), - KERBEROS_USER, - KERBEROS_PRINCIPAL, - KERBEROS_KEYTAB_LOCATION + KERBEROS_USER_CONF, + KERBEROS_PRINCIPAL_CONF, + KERBEROS_KEYTAB_LOCATION_CONF ); } } diff --git a/src/test/java/com/splunk/hecclient/IndexerTest.java b/src/test/java/com/splunk/hecclient/IndexerTest.java index 943d5cce..883e2727 100644 --- a/src/test/java/com/splunk/hecclient/IndexerTest.java +++ b/src/test/java/com/splunk/hecclient/IndexerTest.java @@ -31,7 +31,7 @@ public class IndexerTest { @Test public void getHeaders() { - Indexer indexer = new Indexer(baseUrl, token, null, null, hecConfig); + Indexer indexer = new Indexer(baseUrl, null, null, hecConfig); Header[] headers = indexer.getHeaders(); Assert.assertEquals(3, headers.length); @@ -68,7 +68,7 @@ public void getHeaders() { @Test public void getterSetter() { - Indexer indexer = new Indexer(baseUrl, token, null, null,hecConfig); + Indexer indexer = new Indexer(baseUrl, null, null,hecConfig); Assert.assertEquals(baseUrl, indexer.getBaseUrl()); Assert.assertEquals(token, indexer.getToken()); @@ -79,7 +79,7 @@ public void getterSetter() { @Test public void toStr() { - Indexer indexer = new Indexer(baseUrl, token, null, null, hecConfig); + Indexer indexer = new Indexer(baseUrl, null, null, hecConfig); Assert.assertEquals(baseUrl, indexer.toString()); } @@ -92,7 +92,7 @@ public void sendWithSuccess() { } PollerMock poller = new PollerMock(); - Indexer indexer = new Indexer(baseUrl, token, client, poller, hecConfig); + Indexer indexer = new Indexer(baseUrl, client, poller, hecConfig); EventBatch batch = UnitUtil.createBatch(); boolean result = indexer.send(batch); Assert.assertTrue(result); @@ -144,7 +144,7 @@ public void sendWithReadError() { private Indexer assertFailure(CloseableHttpClient client) { PollerMock poller = new PollerMock(); - Indexer indexer = new Indexer(baseUrl, token, client, poller, hecConfig); + Indexer indexer = new Indexer(baseUrl, client, poller, hecConfig); EventBatch batch = UnitUtil.createBatch(); boolean result = indexer.send(batch); Assert.assertFalse(result);