diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelInitializer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelInitializer.java index 8a0d4de326..142c2e9521 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelInitializer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelInitializer.java @@ -16,6 +16,7 @@ import static io.streamnative.pulsar.handlers.kop.KafkaChannelInitializer.MAX_FRAME_LENGTH; import static io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler.TLS_HANDLER; +import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -23,6 +24,7 @@ import io.netty.handler.ssl.SslHandler; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils; +import java.util.concurrent.CompletableFuture; import org.eclipse.jetty.util.ssl.SslContextFactory; /** @@ -50,13 +52,27 @@ public TransactionMarkerChannelInitializer(KafkaServiceConfiguration kafkaConfig @Override protected void initChannel(SocketChannel ch) throws Exception { - if (this.enableTls) { - ch.pipeline().addLast(TLS_HANDLER, - new SslHandler(SSLUtils.createClientSslEngine(sslContextFactory))); - } ch.pipeline().addLast(lengthFieldPrepender); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4)); ch.pipeline().addLast("txnHandler", new TransactionMarkerChannelHandler(transactionMarkerChannelManager)); } + + protected CompletableFuture initTls(Channel ch, String host, int port) { + if (this.enableTls) { + CompletableFuture initTlsFuture = new CompletableFuture<>(); + ch.eventLoop().execute(() -> { + try { + ch.pipeline().addFirst(TLS_HANDLER, + new SslHandler(SSLUtils.createClientSslEngine(sslContextFactory, host, port))); + initTlsFuture.complete(ch); + } catch (Throwable t) { + initTlsFuture.completeExceptionally(t); + } + }); + return initTlsFuture; + } else { + return CompletableFuture.completedFuture(ch); + } + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java index 310a79460b..4f29a3524a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java @@ -77,6 +77,8 @@ public class TransactionMarkerChannelManager { private final Bootstrap bootstrap; + private final TransactionMarkerChannelInitializer transactionMarkerChannelInitializer; + private final Map> handlerMap = new ConcurrentHashMap<>(); @@ -182,7 +184,9 @@ public TransactionMarkerChannelManager(String tenant, bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup); bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup)); - bootstrap.handler(new TransactionMarkerChannelInitializer(kafkaConfig, enableTls, this)); + transactionMarkerChannelInitializer = + new TransactionMarkerChannelInitializer(kafkaConfig, enableTls, this); + bootstrap.handler(transactionMarkerChannelInitializer); } public CompletableFuture getChannel(InetSocketAddress socketAddress) { @@ -192,7 +196,10 @@ public CompletableFuture getChannel(InetSocketA ensureDrainQueuedTransactionMarkersActivity(); return handlerMap.computeIfAbsent(socketAddress, address -> { CompletableFuture handlerFuture = new CompletableFuture<>(); - ChannelFutures.toCompletableFuture(bootstrap.connect(socketAddress)) + ChannelFutures.toCompletableFuture(bootstrap.register()) + .thenCompose(ch -> transactionMarkerChannelInitializer + .initTls(ch, socketAddress.getHostString(), socketAddress.getPort())) + .thenCompose(ch -> ChannelFutures.toCompletableFuture(ch.connect(socketAddress))) .thenAccept(channel -> { handlerFuture.complete( (TransactionMarkerChannelHandler) channel.pipeline().get("txnHandler")); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ssl/SSLUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ssl/SSLUtils.java index 616458391d..c8c2cef6bc 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ssl/SSLUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ssl/SSLUtils.java @@ -257,9 +257,10 @@ public static SSLEngine createSslEngine(SslContextFactory.Server sslContextFacto return engine; } - public static SSLEngine createClientSslEngine(SslContextFactory.Client sslContextFactory) throws Exception { + public static SSLEngine createClientSslEngine(SslContextFactory.Client sslContextFactory, + String host, int port) throws Exception { sslContextFactory.start(); - SSLEngine engine = sslContextFactory.newSSLEngine(); + SSLEngine engine = sslContextFactory.newSSLEngine(host, port); engine.setUseClientMode(true); return engine; diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/KafkaSSLChannelTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/KafkaSSLChannelTest.java index 7d7fbd785e..5a1504e17b 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/KafkaSSLChannelTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/KafkaSSLChannelTest.java @@ -84,20 +84,22 @@ public KafkaSSLChannelTest(final String entryFormat, boolean withCertHost) { * @param withCertHost the keystore with certHost or not. */ private void setSslConfigurations(boolean withCertHost) { - String path = "./src/test/resources/ssl/certificate" + (withCertHost ? "2" : "") + "/"; - if (!withCertHost) { + String path = "./src/test/resources/ssl/certificate" + (withCertHost ? "" : "2") + "/"; + if (withCertHost) { this.kopSslKeystoreLocation = path + "broker.keystore.jks"; this.kopSslKeystorePassword = "broker"; this.kopSslTruststoreLocation = path + "broker.truststore.jks"; this.kopSslTruststorePassword = "broker"; + this.kopClientTruststoreLocation = path + "broker.truststore.jks"; + this.kopClientTruststorePassword = "broker"; } else { this.kopSslKeystoreLocation = path + "server.keystore.jks"; this.kopSslKeystorePassword = "server"; this.kopSslTruststoreLocation = path + "server.truststore.jks"; this.kopSslTruststorePassword = "server"; + kopClientTruststorePassword = "client"; + kopClientTruststoreLocation = path + "client.truststore.jks"; } - kopClientTruststoreLocation = path + "client.truststore.jks"; - kopClientTruststorePassword = "client"; } @Factory @@ -111,6 +113,10 @@ public static Object[] instances() { } protected void sslSetUpForBroker() throws Exception { + + // require TLS verification when hostname is on certificate + conf.setTlsHostnameVerificationEnabled(withCertHost); + conf.setKafkaTransactionCoordinatorEnabled(true); conf.setKopTlsEnabledWithBroker(true); conf.setKopSslKeystoreType("JKS"); @@ -153,7 +159,7 @@ public void testKafkaProduceSSL() throws Exception { String messageStrPrefix = "Message_Kop_KafkaProduceKafkaConsume_" + partitionNumber + "_"; @Cleanup - SslProducer kProducer = new SslProducer(topicName, getKafkaBrokerPortTls(), + SslProducer kProducer = new SslProducer(topicName, getKafkaBrokerPortTls(), withCertHost, kopClientTruststoreLocation, kopClientTruststorePassword); for (int i = 0; i < totalMsgs; i++) { @@ -188,7 +194,8 @@ public static class SslProducer implements Closeable { private final KafkaProducer producer; private final String topic; - public SslProducer(String topic, int port, String truststoreLocation, String truststorePassword) { + public SslProducer(String topic, int port, boolean withCertHost, String truststoreLocation, + String truststorePassword) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost" + ":" + port); props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoKafkaOnPulsarProducerSSL"); @@ -201,7 +208,7 @@ public SslProducer(String topic, int port, String truststoreLocation, String tru props.put("ssl.truststore.password", truststorePassword); // default is https, here need to set empty. - props.put("ssl.endpoint.identification.algorithm", ""); + props.put("ssl.endpoint.identification.algorithm", withCertHost ? "HTTPS" : ""); producer = new KafkaProducer<>(props); this.topic = topic; @@ -233,7 +240,7 @@ public void basicProduceAndConsumeWithTxTest() throws Exception { producerProps.put("ssl.truststore.password", kopClientTruststorePassword); // default is https, here need to set empty. - producerProps.put("ssl.endpoint.identification.algorithm", ""); + producerProps.put("ssl.endpoint.identification.algorithm", withCertHost ? "HTTPS" : ""); @Cleanup KafkaProducer producer = new KafkaProducer<>(producerProps); @@ -292,7 +299,7 @@ private void consumeTxData(String kafkaServer, String topicName, String isolatio consumerProps.put("ssl.truststore.password", kopClientTruststorePassword); // default is https, here need to set empty. - consumerProps.put("ssl.endpoint.identification.algorithm", ""); + consumerProps.put("ssl.endpoint.identification.algorithm", withCertHost ? "HTTPS" : ""); @Cleanup KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/KafkaSSLChannelWithClientAuthTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/KafkaSSLChannelWithClientAuthTest.java index 57fb22f225..01dd5708a4 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/KafkaSSLChannelWithClientAuthTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/KafkaSSLChannelWithClientAuthTest.java @@ -15,7 +15,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; -import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; import java.io.Closeable; import java.util.Properties; @@ -47,14 +46,11 @@ public class KafkaSSLChannelWithClientAuthTest extends KopProtocolHandlerTestBas static { final HostnameVerifier defaultHostnameVerifier = javax.net.ssl.HttpsURLConnection.getDefaultHostnameVerifier(); - final HostnameVerifier localhostAcceptedHostnameVerifier = new HostnameVerifier() { - - public boolean verify(String hostname, javax.net.ssl.SSLSession sslSession) { - if (hostname.equals("localhost")) { - return true; - } - return defaultHostnameVerifier.verify(hostname, sslSession); + final HostnameVerifier localhostAcceptedHostnameVerifier = (hostname, sslSession) -> { + if (hostname.equals("localhost")) { + return true; } + return defaultHostnameVerifier.verify(hostname, sslSession); }; javax.net.ssl.HttpsURLConnection.setDefaultHostnameVerifier(localhostAcceptedHostnameVerifier); } @@ -71,13 +67,13 @@ public static Object[] instances() { }; } - protected void sslSetUpForBroker() throws Exception { - ((KafkaServiceConfiguration) conf).setKopSslClientAuth("required"); - ((KafkaServiceConfiguration) conf).setKopSslKeystoreType("JKS"); - ((KafkaServiceConfiguration) conf).setKopSslKeystoreLocation(kopSslKeystoreLocation); - ((KafkaServiceConfiguration) conf).setKopSslKeystorePassword(kopSslKeystorePassword); - ((KafkaServiceConfiguration) conf).setKopSslTruststoreLocation(kopSslTruststoreLocation); - ((KafkaServiceConfiguration) conf).setKopSslTruststorePassword(kopSslTruststorePassword); + protected void sslSetUpForBroker() { + conf.setKopSslClientAuth("required"); + conf.setKopSslKeystoreType("JKS"); + conf.setKopSslKeystoreLocation(kopSslKeystoreLocation); + conf.setKopSslKeystorePassword(kopSslKeystorePassword); + conf.setKopSslTruststoreLocation(kopSslTruststoreLocation); + conf.setKopSslTruststorePassword(kopSslTruststorePassword); } @BeforeMethod @@ -161,9 +157,6 @@ public SslProducer(String topic, int port) { props.put("ssl.keystore.location", "./src/test/resources/ssl/certificate/client.keystore.jks"); props.put("ssl.keystore.password", "client"); - // default is https, here need to set empty. - props.put("ssl.endpoint.identification.algorithm", ""); - producer = new KafkaProducer<>(props); this.topic = topic; } diff --git a/tests/src/test/resources/ssl/certificate/broker.keystore.jks b/tests/src/test/resources/ssl/certificate/broker.keystore.jks index 063b4f8b6d..ce1bc84a65 100644 Binary files a/tests/src/test/resources/ssl/certificate/broker.keystore.jks and b/tests/src/test/resources/ssl/certificate/broker.keystore.jks differ diff --git a/tests/src/test/resources/ssl/certificate/broker.truststore.jks b/tests/src/test/resources/ssl/certificate/broker.truststore.jks index dc062946bb..851da10b1f 100644 Binary files a/tests/src/test/resources/ssl/certificate/broker.truststore.jks and b/tests/src/test/resources/ssl/certificate/broker.truststore.jks differ diff --git a/tests/src/test/resources/ssl/certificate/client.keystore.jks b/tests/src/test/resources/ssl/certificate/client.keystore.jks index 9d61ecef52..7e5a856173 100644 Binary files a/tests/src/test/resources/ssl/certificate/client.keystore.jks and b/tests/src/test/resources/ssl/certificate/client.keystore.jks differ diff --git a/tests/src/test/resources/ssl/certificate/client.truststore.jks b/tests/src/test/resources/ssl/certificate/client.truststore.jks index e8b93df696..5c5b6374fc 100644 Binary files a/tests/src/test/resources/ssl/certificate/client.truststore.jks and b/tests/src/test/resources/ssl/certificate/client.truststore.jks differ