diff --git a/.github/workflows/hdinsight-kafka-java-build.yaml b/.github/workflows/hdinsight-kafka-java-build.yaml new file mode 100644 index 0000000..c0fc97f --- /dev/null +++ b/.github/workflows/hdinsight-kafka-java-build.yaml @@ -0,0 +1,32 @@ +# This workflow will build a Java project with Maven, and cache/restore any dependencies to improve the workflow execution time +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-maven + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +name: Java CI with Maven +on: + push: + branches: + - main + pull_request: + types: [ opened, synchronize, reopened ] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis + + - name: Set up JDK 1.8 + uses: actions/setup-java@v2 + with: + java-version: 8 + distribution: 'adopt' + + - name: Build with Maven + run: mvn -B package --file pom.xml diff --git a/DomainJoined-Producer-Consumer-With-TLS/README.md b/DomainJoined-Producer-Consumer-With-TLS/README.md index b956cb2..1c8b9ef 100644 --- a/DomainJoined-Producer-Consumer-With-TLS/README.md +++ b/DomainJoined-Producer-Consumer-With-TLS/README.md @@ -111,12 +111,12 @@ properties.setProperty("auto.offset.reset","earliest"); consumer = new KafkaConsumer<>(properties); ``` -#### Note: -The important properties added for ESP with TLS Encryption enabled cluster. +> **Note:**
+> The important properties added for ESP with TLS Encryption enabled cluster. This is critical to add in `AdminClient, Producer and Consumer`. It is possible that your ESP cluster might have TLS Encryption and Authentication both. Please change the configurations based on [Enable TLS Encryption on ESP cluster](https://learn.microsoft.com/en-us/azure/hdinsight/kafka/apache-esp-kafka-ssl-encryption-authentication) -``` +```java // Set the TLS Encryption for Domain Joined TLS Encrypted cluster properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); properties.setProperty("ssl.mechanism", "GSSAPI"); @@ -164,7 +164,7 @@ Download the jars from the [Kafka Get Started Azure sample](https://github.com/A ``` ## Run the example -This conversation was marked as resolved by piyushgupta +This conversation was marked as resolved by `piyushgupta` 1. Replace `sshuser` with the SSH user for your cluster, and replace `CLUSTERNAME` with the name of your cluster. Open an SSH connection to the cluster, by entering the following command. If prompted, enter the password for the SSH user account. @@ -181,7 +181,7 @@ This conversation was marked as resolved by piyushgupta export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2); ``` - > **Note** + > **Note:** This command requires Ambari access. If your cluster is behind an NSG, run this command from a machine that can access Ambari. 1. Create Kafka topic, `myTest`, by entering the following command: @@ -216,7 +216,7 @@ This conversation was marked as resolved by piyushgupta export password='' export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2); ``` -2. Create the keytab file for espkafkauser with below steps +2. Create the keytab file for `espkafkauser` with below steps ```bash ktutil ktutil: addent -password -p espkafkauser@TEST.COM -k 1 -e RC4-HMAC @@ -226,10 +226,10 @@ This conversation was marked as resolved by piyushgupta ``` **NOTE:-** -1. espkafkauser should be part of your domain group and add it in RangerUI to give CRUD operations privileges. -2. Keep this domain name (TEST.COM) in capital only. Otherwise, kerberos will throw errors at the time of CRUD operations. +1. `espkafkauser` should be part of your domain group and add it in RangerUI to give CRUD operations privileges. +2. Keep this domain name `(TEST.COM)` in capital only. Otherwise, kerberos will throw errors at the time of CRUD operations. -You will be having an espkafkauser.keytab file in local directory. Now create an espkafkauser_jaas.conf jaas config file with data given below +You will be having an `espkafkauser.keytab` file in local directory. Now create an `espkafkauser_jaas.conf` jaas config file with data given below ``` KafkaClient { @@ -264,16 +264,21 @@ KafkaClient { ![](media/Kafk_Policy_UI.png) -4. Now edit the alltopic policy and add espkafkauser in selectuser from dropdown. Click on save policy after changes +4. Now edit the `alltopic` policy and add `espkafkauser` in select user from dropdown. Click on save policy after changes ![](media/Edit_Policy_UI.png) ![](media/Add_User.png) +5. If HDI version is `5.1` then edit the `allconsumer` policy and add `espkafkauser` in select user from dropdown. Click on save policy after changes + +![](media/Edit_AllConsumerPolicy.png) + +![](media/Add_ESPKafkaUser.png) -5. If we are not able to see our user in dropdown then that mean that user is not available in AAD domain. +6. If we are not able to see our user in dropdown then that mean that user is not available in AAD domain. -6. Now Execute CRUD operations in head node for verification +7. Now Execute CRUD operations in head node for verification ```bash # Sample command diff --git a/DomainJoined-Producer-Consumer-With-TLS/media/Add_ESPKafkaUser.png b/DomainJoined-Producer-Consumer-With-TLS/media/Add_ESPKafkaUser.png new file mode 100644 index 0000000..7e02ccb Binary files /dev/null and b/DomainJoined-Producer-Consumer-With-TLS/media/Add_ESPKafkaUser.png differ diff --git a/DomainJoined-Producer-Consumer-With-TLS/media/Edit_AllConsumerPolicy.png b/DomainJoined-Producer-Consumer-With-TLS/media/Edit_AllConsumerPolicy.png new file mode 100644 index 0000000..80894b3 Binary files /dev/null and b/DomainJoined-Producer-Consumer-With-TLS/media/Edit_AllConsumerPolicy.png differ diff --git a/DomainJoined-Producer-Consumer/README.md b/DomainJoined-Producer-Consumer/README.md index 09378e5..bdb5a9b 100644 --- a/DomainJoined-Producer-Consumer/README.md +++ b/DomainJoined-Producer-Consumer/README.md @@ -4,8 +4,8 @@ languages: java products: - azure - azure-hdinsight -description: "Examples in this repository demonstrate how to use the Kafka Consumer, Producer, and Streaming APIs with a Kerberized Kafka on HDInsight cluster." -urlFragment: hdinsight-kafka-java-get-started + description: "Examples in this repository demonstrate how to use the Kafka Consumer, Producer, and Streaming APIs with a Kerberized Kafka on HDInsight cluster." + urlFragment: hdinsight-kafka-java-get-started --- # Java-based example of using the Kafka Consumer, Producer, and Streaming APIs @@ -39,9 +39,9 @@ The important things to understand in the `pom.xml` file are: ```xml - org.apache.kafka - kafka-clients - ${kafka.version} + org.apache.kafka + kafka-clients + ${kafka.version} ``` @@ -90,7 +90,8 @@ properties.setProperty("auto.offset.reset","earliest"); consumer = new KafkaConsumer<>(properties); ``` -Notice the important property added for ESP cluster. This is critical to add in AdminClient, Producer and Consumer. properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); +Notice the important property added for ESP cluster. This is critical to add in AdminClient, Producer and Consumer. +`properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");`
In this code, the consumer is configured to read from the start of the topic (`auto.offset.reset` is set to `earliest`.) ### Run.java @@ -101,16 +102,16 @@ The [Run.java](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started Download the jars from the [Kafka Get Started Azure sample](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started/tree/master/Prebuilt-Jars). If your cluster is **Enterprise Security Package (ESP)** enabled, use kafka-producer-consumer-esp.jar. Use the command below to copy the jars to your cluster. - ```cmd - scp kafka-producer-consumer-esp.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar - ``` +```cmd +scp kafka-producer-consumer-esp.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar +``` ## Build the JAR files from code -If you would like to skip this step, prebuilt jars can be downloaded from the `Prebuilt-Jars` subdirectory. Download the kafka-producer-consumer.jar. If your cluster is **Enterprise Security Package (ESP)** enabled, use kafka-producer-consumer-esp.jar. Execute step 3 to copy the jar to your HDInsight cluster. +If you would like to skip this step, prebuilt jars can be downloaded from the `Prebuilt-Jars` subdirectory. Download the `kafka-producer-consumer.jar`. If your cluster is **Enterprise Security Package (ESP)** enabled, use `kafka-producer-consumer-esp.jar`. Execute step 3 to copy the jar to your HDInsight cluster. -1. Download and extract the examples from [https://github.com/Azure-Samples/hdinsight-kafka-java-get-started](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started). +1. Download and extract the examples from [Hdinsight-Kafka-Java](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started). 2. If you are using **Enterprise Security Package (ESP)** enabled Kafka cluster, you should set the location to `DomainJoined-Producer-Consumer` subdirectory. Use the following command to build the application: @@ -189,10 +190,10 @@ This conversation was marked as resolved by anusricorp ``` **NOTE:-** -1. espkafkauser should be part of your domain group and add it in RangerUI to give CRUD operations privileges. -2. Keep this domain name (TEST.COM) in capital only. Otherwise, kerberos will throw errors at the time of CRUD operations. +1. `espkafkauser` should be part of your domain group and add it in RangerUI to give CRUD operations privileges. +2. Keep this domain name `(TEST.COM)` in capital letters only. Otherwise, kerberos will throw errors at the time of CRUD operations. -You will be having an espkafkauser.keytab file in local directory. Now create an espkafkauser_jaas.conf jaas config file with data given below +You will be having an `espkafkauser.keytab` file in local directory. Now create an `espkafkauser_jaas.conf` jaas config file with data given below ``` KafkaClient { @@ -227,16 +228,21 @@ KafkaClient { ![](media/Kafk_Policy_UI.png) -4. Now edit the alltopic policy and add espkafkauser in selectuser from dropdown. Click on save policy after changes +4. Now edit the `alltopic` policy and add `espkafkauser` in select user from dropdown. Click on save policy after changes ![](media/Edit_Policy_UI.png) ![](media/Add_User.png) +5. If HDI version is `5.1` then edit the `allconsumer` policy and add `espkafkauser` in select user from dropdown. Click on save policy after changes + +![](media/Edit_AllConsumerPolicy.png) + +![](media/Add_ESPKafkaUser.png) -5. If we are not able to see our user in dropdown then that mean that user is not available in AAD domain. +6. If we are not able to see our user in dropdown then that mean that user is not available in AAD domain. -6. Now Execute CRUD operations in head node for verification +7. Now Execute CRUD operations in head node for verification ```bash # Sample command diff --git a/DomainJoined-Producer-Consumer/media/Add_ESPKafkaUser.png b/DomainJoined-Producer-Consumer/media/Add_ESPKafkaUser.png new file mode 100644 index 0000000..7e02ccb Binary files /dev/null and b/DomainJoined-Producer-Consumer/media/Add_ESPKafkaUser.png differ diff --git a/DomainJoined-Producer-Consumer/media/Edit_AllConsumerPolicy.png b/DomainJoined-Producer-Consumer/media/Edit_AllConsumerPolicy.png new file mode 100644 index 0000000..80894b3 Binary files /dev/null and b/DomainJoined-Producer-Consumer/media/Edit_AllConsumerPolicy.png differ diff --git a/DomainJoined-Producer-Consumer/pom.xml b/DomainJoined-Producer-Consumer/pom.xml index c425e1c..1e521d8 100644 --- a/DomainJoined-Producer-Consumer/pom.xml +++ b/DomainJoined-Producer-Consumer/pom.xml @@ -5,11 +5,12 @@ kafka-producer-consumer-esp jar 1.0-SNAPSHOT - kafka-producer-consumer + kafka-producer-consumer-esp http://maven.apache.org - 2.1.1 + 2.1.1 + 2.0.7 @@ -18,6 +19,16 @@ kafka-clients ${kafka.version} + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + slf4j-simple + ${slf4j.version} + diff --git a/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/AdminClientWrapper.java b/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/AdminClientWrapper.java index 0787aec..04097a3 100644 --- a/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/AdminClientWrapper.java +++ b/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/AdminClientWrapper.java @@ -1,26 +1,18 @@ package com.microsoft.example; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.DescribeTopicsResult; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.DeleteTopicsResult; -import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.admin.NewTopic; - -import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - -import java.util.Collection; +import java.io.IOException; import java.util.Collections; -import java.util.concurrent.ExecutionException; import java.util.Properties; -import java.util.Random; -import java.io.IOException; public class AdminClientWrapper { + private static final Logger logger = LoggerFactory.getLogger(AdminClientWrapper.class); public static Properties getProperties(String brokers) { Properties properties = new Properties(); @@ -42,13 +34,10 @@ public static void describeTopics(String brokers, String topicName) throws IOExc try (final AdminClient adminClient = KafkaAdminClient.create(properties)) { // Make async call to describe the topic. final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName)); - TopicDescription description = describeTopicsResult.values().get(topicName).get(); - System.out.print(description.toString()); - } catch (Exception e) { - System.out.print("Describe denied\n"); - System.out.print(e.getMessage()); - //throw new RuntimeException(e.getMessage(), e); + logger.info(description.toString()); + } catch (Exception exception) { + logger.error("Describe Topic denied: ", exception); } } @@ -59,11 +48,9 @@ public static void deleteTopics(String brokers, String topicName) throws IOExcep try (final AdminClient adminClient = KafkaAdminClient.create(properties)) { final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName)); deleteTopicsResult.values().get(topicName).get(); - System.out.print("Topic " + topicName + " deleted"); - } catch (Exception e) { - System.out.print("Delete Topics denied\n"); - System.out.print(e.getMessage()); - //throw new RuntimeException(e.getMessage(), e); + logger.info("Topic " + topicName + " deleted"); + } catch (Exception exception) { + logger.error("Delete Topic denied: ", exception); } } @@ -78,11 +65,9 @@ public static void createTopics(String brokers, String topicName) throws IOExcep final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic)); createTopicsResult.values().get(topicName).get(); - System.out.print("Topic " + topicName + " created"); - } catch (Exception e) { - System.out.print("Create Topics denied\n"); - System.out.print(e.getMessage()); - //throw new RuntimeException(e.getMessage(), e); + logger.info("Topic " + topicName + " created"); + } catch (Exception exception) { + logger.error("Create Topics denied: ", exception); } } } diff --git a/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/Consumer.java b/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/Consumer.java index 8e15708..e9b6991 100644 --- a/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/Consumer.java +++ b/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/Consumer.java @@ -6,8 +6,11 @@ import org.apache.kafka.clients.CommonClientConfigs; import java.util.Properties; import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Consumer { + private static final Logger logger = LoggerFactory.getLogger(Consumer.class); public static int consume(String brokers, String groupId, String topicName) { // Create a consumer KafkaConsumer consumer; @@ -45,7 +48,7 @@ public static int consume(String brokers, String groupId, String topicName) { for(ConsumerRecord record: records) { // Display record and count count += 1; - System.out.println( count + ": " + record.value()); + logger.info(count + ": " + record.value()); } } } diff --git a/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/Producer.java b/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/Producer.java index b352b03..ba85b84 100644 --- a/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/Producer.java +++ b/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/Producer.java @@ -8,16 +8,18 @@ import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.TopicDescription; - import java.util.Collection; import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.Properties; import java.util.Random; import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Producer { + private static final Logger logger = LoggerFactory.getLogger(Producer.class); public static void produce(String brokers, String topicName) throws IOException { @@ -44,22 +46,21 @@ public static void produce(String brokers, String topicName) throws IOException }; String progressAnimation = "|/-\\"; + int numberOfMessages = 100; // Produce a bunch of records - for(int i = 0; i < 100; i++) { + for(int i = 0; i < numberOfMessages; i++) { // Pick a sentence at random String sentence = sentences[random.nextInt(sentences.length)]; // Send the sentence to the test topic - try - { + try { producer.send(new ProducerRecord(topicName, sentence)).get(); - } - catch (Exception ex) - { - System.out.print(ex.getMessage()); - throw new IOException(ex.toString()); + } catch (Exception exception) { + logger.error("Exception while producing messages: ", exception); + throw new IOException(exception.toString()); } String progressBar = "\r" + progressAnimation.charAt(i % progressAnimation.length()) + " " + i; System.out.write(progressBar.getBytes()); } + logger.info("Produced Messages: " + numberOfMessages); } } diff --git a/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/Run.java b/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/Run.java index ab165ea..90abab5 100644 --- a/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/Run.java +++ b/DomainJoined-Producer-Consumer/src/main/java/com/microsoft/example/Run.java @@ -5,9 +5,11 @@ import java.io.PrintWriter; import java.io.File; import java.lang.Exception; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; // Handle starting producer or consumer public class Run { + private static final Logger logger = LoggerFactory.getLogger(Run.class); public static void main(String[] args) throws IOException { if(args.length < 3) { usage(); @@ -45,8 +47,7 @@ public static void main(String[] args) throws IOException { } // Display usage public static void usage() { - System.out.println("Usage:"); - System.out.println("kafka-example.jar brokerhosts [groupid]"); + logger.info("Usage: \n kafka-example.jar brokerhosts [groupid]"); System.exit(1); } } diff --git a/Prebuilt-Jars/kafka-producer-consumer-esp.jar b/Prebuilt-Jars/kafka-producer-consumer-esp.jar index afa8868..8cb7b01 100644 Binary files a/Prebuilt-Jars/kafka-producer-consumer-esp.jar and b/Prebuilt-Jars/kafka-producer-consumer-esp.jar differ diff --git a/Prebuilt-Jars/kafka-producer-consumer-tls-esp.jar b/Prebuilt-Jars/kafka-producer-consumer-tls-esp.jar index 409a8a1..8f46178 100644 Binary files a/Prebuilt-Jars/kafka-producer-consumer-tls-esp.jar and b/Prebuilt-Jars/kafka-producer-consumer-tls-esp.jar differ diff --git a/Prebuilt-Jars/kafka-producer-consumer.jar b/Prebuilt-Jars/kafka-producer-consumer.jar index 8f78039..3f469a2 100644 Binary files a/Prebuilt-Jars/kafka-producer-consumer.jar and b/Prebuilt-Jars/kafka-producer-consumer.jar differ diff --git a/Producer-Consumer/pom.xml b/Producer-Consumer/pom.xml index b2de84b..8dd6b71 100644 --- a/Producer-Consumer/pom.xml +++ b/Producer-Consumer/pom.xml @@ -9,15 +9,26 @@ http://maven.apache.org - 2.1.1 + 2.1.1 + 2.0.7 - - org.apache.kafka - kafka-clients - ${kafka.version} - + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + slf4j-simple + ${slf4j.version} + diff --git a/Producer-Consumer/src/main/java/com/microsoft/example/AdminClientWrapper.java b/Producer-Consumer/src/main/java/com/microsoft/example/AdminClientWrapper.java index 89db025..06e018c 100644 --- a/Producer-Consumer/src/main/java/com/microsoft/example/AdminClientWrapper.java +++ b/Producer-Consumer/src/main/java/com/microsoft/example/AdminClientWrapper.java @@ -18,9 +18,12 @@ import java.util.Properties; import java.util.Random; import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AdminClientWrapper { + private static final Logger logger = LoggerFactory.getLogger(AdminClientWrapper.class); public static Properties getProperties(String brokers) { Properties properties = new Properties(); @@ -42,13 +45,10 @@ public static void describeTopics(String brokers, String topicName) throws IOExc try (final AdminClient adminClient = KafkaAdminClient.create(properties)) { // Make async call to describe the topic. final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName)); - TopicDescription description = describeTopicsResult.values().get(topicName).get(); - System.out.print(description.toString()); - } catch (Exception e) { - System.out.print("Describe denied\n"); - System.out.print(e.getMessage()); - //throw new RuntimeException(e.getMessage(), e); + logger.info(description.toString()); + } catch (Exception exception) { + logger.error("Describe Topic denied: ", exception); } } @@ -59,11 +59,9 @@ public static void deleteTopics(String brokers, String topicName) throws IOExcep try (final AdminClient adminClient = KafkaAdminClient.create(properties)) { final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName)); deleteTopicsResult.values().get(topicName).get(); - System.out.print("Topic " + topicName + " deleted"); - } catch (Exception e) { - System.out.print("Delete Topics denied\n"); - System.out.print(e.getMessage()); - //throw new RuntimeException(e.getMessage(), e); + logger.info("Topic " + topicName + " deleted"); + } catch (Exception exception) { + logger.error("Delete Topic denied: ", exception); } } @@ -78,11 +76,9 @@ public static void createTopics(String brokers, String topicName) throws IOExcep final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic)); createTopicsResult.values().get(topicName).get(); - System.out.print("Topic " + topicName + " created"); - } catch (Exception e) { - System.out.print("Create Topics denied\n"); - System.out.print(e.getMessage()); - //throw new RuntimeException(e.getMessage(), e); + logger.info("Topic " + topicName + " created"); + } catch (Exception exception) { + logger.error("Create Topics denied: ", exception); } } } diff --git a/Producer-Consumer/src/main/java/com/microsoft/example/Consumer.java b/Producer-Consumer/src/main/java/com/microsoft/example/Consumer.java index fad158a..fc70095 100644 --- a/Producer-Consumer/src/main/java/com/microsoft/example/Consumer.java +++ b/Producer-Consumer/src/main/java/com/microsoft/example/Consumer.java @@ -3,10 +3,16 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SslConfigs; + import java.util.Properties; import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Consumer { + private static final Logger logger = LoggerFactory.getLogger(Consumer.class); public static int consume(String brokers, String groupId, String topicName) { // Create a consumer KafkaConsumer consumer; @@ -44,7 +50,7 @@ public static int consume(String brokers, String groupId, String topicName) { for(ConsumerRecord record: records) { // Display record and count count += 1; - System.out.println( count + ": " + record.value()); + logger.info( count + ": " + record.value()); } } } diff --git a/Producer-Consumer/src/main/java/com/microsoft/example/Producer.java b/Producer-Consumer/src/main/java/com/microsoft/example/Producer.java index 49ccdf5..6319610 100644 --- a/Producer-Consumer/src/main/java/com/microsoft/example/Producer.java +++ b/Producer-Consumer/src/main/java/com/microsoft/example/Producer.java @@ -8,16 +8,18 @@ import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.TopicDescription; - import java.util.Collection; import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.Properties; import java.util.Random; import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Producer { + private static final Logger logger = LoggerFactory.getLogger(Producer.class); public static void produce(String brokers, String topicName) throws IOException { @@ -44,22 +46,21 @@ public static void produce(String brokers, String topicName) throws IOException }; String progressAnimation = "|/-\\"; + int numberOfMessages = 100; // Produce a bunch of records - for(int i = 0; i < 100; i++) { + for(int i = 0; i < numberOfMessages; i++) { // Pick a sentence at random String sentence = sentences[random.nextInt(sentences.length)]; // Send the sentence to the test topic - try - { + try { producer.send(new ProducerRecord(topicName, sentence)).get(); - } - catch (Exception ex) - { - System.out.print(ex.getMessage()); - throw new IOException(ex.toString()); + } catch (Exception exception) { + logger.error("Exception while producing messages: ", exception); + throw new IOException(exception.toString()); } String progressBar = "\r" + progressAnimation.charAt(i % progressAnimation.length()) + " " + i; System.out.write(progressBar.getBytes()); } + logger.info("Produced Messages: " + numberOfMessages); } } diff --git a/Producer-Consumer/src/main/java/com/microsoft/example/Run.java b/Producer-Consumer/src/main/java/com/microsoft/example/Run.java index d820971..f8362cd 100644 --- a/Producer-Consumer/src/main/java/com/microsoft/example/Run.java +++ b/Producer-Consumer/src/main/java/com/microsoft/example/Run.java @@ -5,9 +5,11 @@ import java.io.PrintWriter; import java.io.File; import java.lang.Exception; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; // Handle starting producer or consumer public class Run { + private static final Logger logger = LoggerFactory.getLogger(Run.class); public static void main(String[] args) throws IOException { if(args.length < 3) { usage(); @@ -45,8 +47,7 @@ public static void main(String[] args) throws IOException { } // Display usage public static void usage() { - System.out.println("Usage:"); - System.out.println("kafka-example.jar brokerhosts [groupid]"); + logger.info("Usage: \n kafka-example.jar brokerhosts [groupid]"); System.exit(1); } } diff --git a/README.md b/README.md index d643fc5..cd4dea4 100644 --- a/README.md +++ b/README.md @@ -17,9 +17,9 @@ There are two projects included in this repository: * Producer-Consumer: This contains a producer and consumer that use a Kafka topic named `test`. -* Streaming: This contains an application that uses the Kafka streaming API (in Kafka 0.10.0 or higher) that reads data from the `test` topic, splits the data into words, and writes a count of words into the `wordcounts` topic. +* Streaming: This contains an application that uses the Kafka streaming API (in Kafka 2.1.1 or higher) that reads data from the `test` topic, splits the data into words, and writes a count of words into the `wordcounts` topic. -NOTE: This both projects assume Kafka 0.10.0, which is available with Kafka on HDInsight cluster version 3.6. +NOTE: This both projects assume Kafka 2.1.1 or higher, which is available with Kafka on HDInsight cluster version 4.0 or higher. ## Producer and Consumer @@ -46,13 +46,12 @@ To run the consumer and producer example, use the following steps: 6. Use SSH to connect to the cluster: - ssh USERNAME@CLUSTERNAME + ssh USERNAME@CLUSTERNAME.DOMAINNAME 7. Use the following commands in the SSH session to get the Zookeeper hosts and Kafka brokers for the cluster. You need this information when working with Kafka. Note that JQ is also installed, as it makes it easier to parse the JSON returned from Ambari. Replace __PASSWORD__ with the login (admin) password for the cluster. Replace __KAFKANAME__ with the name of the Kafka on HDInsight cluster. sudo apt -y install jq export KAFKAZKHOSTS=`curl -sS -u admin:$PASSWORD -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2` - export KAFKABROKERS=`curl -sS -u admin:$PASSWORD -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` 8. Use the following to verify that the environment variables have been correctly populated: @@ -68,13 +67,22 @@ To run the consumer and producer example, use the following steps: wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092 - NOTE: This information may change as you perform scaling operations on the cluster, as this adds and removes worker nodes. You should always retrieve the Zookeeper and Broker information before working with Kafka. + **NOTE:** This information may change as you perform scaling operations on the cluster, as this adds and removes worker nodes. You should always retrieve the Zookeeper and Broker information before working with Kafka. - IMPORTANT: You don't have to provide all broker or Zookeeper nodes. A connection to one broker or Zookeeper node can be used to learn about the others. In this example, the list of hosts is trimmed to two entries. - + **IMPORTANT:** + 1. You don't have to provide all broker or Zookeeper nodes. A connection to one broker or Zookeeper node can be used to learn about the others. In this example, the list of hosts is trimmed to two entries. + 2. If HDI version is `4.0` or `5.0` then we will use zookeeper parameter in all kafka cli commands. + 3. If HDI version is `5.1` then we will use bootstrap-server parameter in all kafka cli commands. + 9. This example uses a topic named `test`. Use the following to create this topic: - - /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS +- **If HDI version is 4.0 or 5.0**
+ ```shell + /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS + ``` +- **If HDI version is 5.1**
+ ```shell + /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic test --bootstrap-server $KAFKABROKERS + ``` 10. Use the producer-consumer example to write records to the topic: @@ -90,7 +98,7 @@ To run the consumer and producer example, use the following steps: ## Streaming -NOTE: The streaming example expects that you have already setup the `test` topic from the previous section. +**NOTE:** The streaming example expects that you have already set up the `test` topic from the previous section. 1. On your development environment, change to the `Streaming` directory and use the following to create a jar for this project: @@ -103,15 +111,22 @@ NOTE: The streaming example expects that you have already setup the `test` topic Replace **SSHUSER** with the SSH user for your cluster, and replace **CLUSTERNAME** with the name of your cluster. When prompted enter the password for the SSH user. 3. Once the file has been uploaded, return to the SSH connection to your HDInsight cluster and use the following commands to create the `wordcounts` and `wordcount-example-Counts-changelog` topics: - - /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS - /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS +- **If HDI version is 4.0 or 5.0**
+ ```shell + /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS + /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS + ``` +- **If HDI version is 5.1**
+ ```shell + /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic wordcounts --bootstrap-server $KAFKABROKERS + /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic wordcount-example-Counts-changelog --bootstrap-server $KAFKABROKERS + ``` 4. Use the following command to start the streaming process in the background: java -jar kafka-streaming.jar $KAFKABROKERS 2>/dev/null & -4. While it is running, use the producer to send messages to the `test` topic: +5. While it is running, use the producer to send messages to the `test` topic: java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS &>/dev/null & diff --git a/Streaming/pom.xml b/Streaming/pom.xml index 630ef66..7640e21 100644 --- a/Streaming/pom.xml +++ b/Streaming/pom.xml @@ -8,7 +8,8 @@ kafka-streaming http://maven.apache.org - 0.10.0.0 + 2.1.1 + 2.0.7 @@ -16,6 +17,22 @@ kafka-streams ${kafka.version} + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + slf4j-simple + ${slf4j.version} + + + org.apache.kafka + kafka-clients + 2.1.1 + compile + diff --git a/Streaming/src/main/java/com/microsoft/example/Stream.java b/Streaming/src/main/java/com/microsoft/example/Stream.java index c8a7e67..fa6d17e 100644 --- a/Streaming/src/main/java/com/microsoft/example/Stream.java +++ b/Streaming/src/main/java/com/microsoft/example/Stream.java @@ -2,11 +2,10 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; import java.util.Arrays; import java.util.Properties; @@ -15,30 +14,30 @@ public class Stream { public static void main( String[] args ) { Properties streamsConfig = new Properties(); + String applicationID = "wordcount-example"; + String inputTopic = "test"; + String outputTopic = "wordcounts"; // The name must be unique on the Kafka cluster - streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example"); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationID); // Brokers streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]); // Zookeeper //streamsConfig.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, args[1]); // SerDes for key and values - streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - // Serdes for the word and count - Serde stringSerde = Serdes.String(); - Serde longSerde = Serdes.Long(); - - KStreamBuilder builder = new KStreamBuilder(); - KStream sentences = builder.stream(stringSerde, stringSerde, "test"); - KStream wordCounts = sentences + StreamsBuilder builder = new StreamsBuilder(); + KStream sentences = builder.stream(inputTopic); + KTable wordCounts = sentences .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) - .map((key, word) -> new KeyValue<>(word, word)) - .countByKey("Counts") - .toStream(); - wordCounts.to(stringSerde, longSerde, "wordcounts"); + .groupBy((key,word) -> word) + .count(); + wordCounts.toStream() + .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); - KafkaStreams streams = new KafkaStreams(builder, streamsConfig); + Topology topology = builder.build(); + KafkaStreams streams = new KafkaStreams(topology, streamsConfig); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..f6b7570 --- /dev/null +++ b/pom.xml @@ -0,0 +1,164 @@ + + + 4.0.0 + com.microsoft.azure.hdinsight + hdinsight-kafka + 1.0-SNAPSHOT + Kafka Jars for HdInsight + pom + + + 2.1.1 + 2.0.7 + 4.0.0 + 3.0.0-M2 + 3.2.0 + 1.6.0 + 2.5.3 + 3.1.1 + 1.2.7 + + + DomainJoined-Producer-Consumer + DomainJoined-Producer-Consumer-With-TLS + Producer-Consumer + Streaming + + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + slf4j-simple + ${slf4j.version} + + + + + + + + + com.github.spotbugs + spotbugs-maven-plugin + ${spotbugs.version} + + Max + + + + package + + check + + + + + + org.apache.maven.plugins + maven-enforcer-plugin + ${enforcer.version} + + + + + + + + + + enforce + + + + + + org.codehaus.mojo + build-helper-maven-plugin + ${build-helper-maven-plugin.version} + + + org.apache.maven.plugins + maven-dependency-plugin + ${maven-dependency-plugin.version} + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/dependencies + false + false + true + + + + + + org.apache.maven.plugins + maven-assembly-plugin + ${maven-assembly-plugin.version} + + + org.codehaus.mojo + flatten-maven-plugin + ${flatten-maven-plugin.version} + + true + resolveCiFriendliesOnly + + + + flatten + process-resources + + flatten + + + + flatten.clean + clean + + clean + + + + + + + \ No newline at end of file