Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .github/workflows/hdinsight-kafka-java-build.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# This workflow will build a Java project with Maven, and cache/restore any dependencies to improve the workflow execution time
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-maven

# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

name: Java CI with Maven
on:
push:
branches:
- main
pull_request:
types: [ opened, synchronize, reopened ]

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis

- name: Set up JDK 1.8
uses: actions/setup-java@v2
with:
java-version: 8
distribution: 'adopt'

- name: Build with Maven
run: mvn -B package --file pom.xml
29 changes: 17 additions & 12 deletions DomainJoined-Producer-Consumer-With-TLS/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
```

#### Note:
The important properties added for ESP with TLS Encryption enabled cluster.
> **Note:**<br>
> The important properties added for ESP with TLS Encryption enabled cluster.
This is critical to add in `AdminClient, Producer and Consumer`.
It is possible that your ESP cluster might have TLS Encryption and Authentication both. Please change the configurations based on
[Enable TLS Encryption on ESP cluster](https://learn.microsoft.com/en-us/azure/hdinsight/kafka/apache-esp-kafka-ssl-encryption-authentication)
```
```java
// Set the TLS Encryption for Domain Joined TLS Encrypted cluster
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.setProperty("ssl.mechanism", "GSSAPI");
Expand Down Expand Up @@ -164,7 +164,7 @@ Download the jars from the [Kafka Get Started Azure sample](https://github.com/A
```

## <a id="run"></a> Run the example
This conversation was marked as resolved by piyushgupta
This conversation was marked as resolved by `piyushgupta`

1. Replace `sshuser` with the SSH user for your cluster, and replace `CLUSTERNAME` with the name of your cluster. Open an SSH connection to the cluster, by entering the following command. If prompted, enter the password for the SSH user account.

Expand All @@ -181,7 +181,7 @@ This conversation was marked as resolved by piyushgupta
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
```

> **Note**
> **Note:**
This command requires Ambari access. If your cluster is behind an NSG, run this command from a machine that can access Ambari.
1. Create Kafka topic, `myTest`, by entering the following command:

Expand Down Expand Up @@ -216,7 +216,7 @@ This conversation was marked as resolved by piyushgupta
export password='<password>'
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
```
2. Create the keytab file for espkafkauser with below steps
2. Create the keytab file for `espkafkauser` with below steps
```bash
ktutil
ktutil: addent -password -p [email protected] -k 1 -e RC4-HMAC
Expand All @@ -226,10 +226,10 @@ This conversation was marked as resolved by piyushgupta
```

**NOTE:-**
1. espkafkauser should be part of your domain group and add it in RangerUI to give CRUD operations privileges.
2. Keep this domain name (TEST.COM) in capital only. Otherwise, kerberos will throw errors at the time of CRUD operations.
1. `espkafkauser` should be part of your domain group and add it in RangerUI to give CRUD operations privileges.
2. Keep this domain name `(TEST.COM)` in capital only. Otherwise, kerberos will throw errors at the time of CRUD operations.

You will be having an espkafkauser.keytab file in local directory. Now create an espkafkauser_jaas.conf jaas config file with data given below
You will be having an `espkafkauser.keytab` file in local directory. Now create an `espkafkauser_jaas.conf` jaas config file with data given below

```
KafkaClient {
Expand Down Expand Up @@ -264,16 +264,21 @@ KafkaClient {
![](media/Kafk_Policy_UI.png)


4. Now edit the alltopic policy and add espkafkauser in selectuser from dropdown. Click on save policy after changes
4. Now edit the `alltopic` policy and add `espkafkauser` in select user from dropdown. Click on save policy after changes

![](media/Edit_Policy_UI.png)

![](media/Add_User.png)

5. If HDI version is `5.1` then edit the `allconsumer` policy and add `espkafkauser` in select user from dropdown. Click on save policy after changes

![](media/Edit_AllConsumerPolicy.png)

![](media/Add_ESPKafkaUser.png)

5. If we are not able to see our user in dropdown then that mean that user is not available in AAD domain.
6. If we are not able to see our user in dropdown then that mean that user is not available in AAD domain.

6. Now Execute CRUD operations in head node for verification
7. Now Execute CRUD operations in head node for verification

```bash
# Sample command
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
40 changes: 23 additions & 17 deletions DomainJoined-Producer-Consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ languages: java
products:
- azure
- azure-hdinsight
description: "Examples in this repository demonstrate how to use the Kafka Consumer, Producer, and Streaming APIs with a Kerberized Kafka on HDInsight cluster."
urlFragment: hdinsight-kafka-java-get-started
description: "Examples in this repository demonstrate how to use the Kafka Consumer, Producer, and Streaming APIs with a Kerberized Kafka on HDInsight cluster."
urlFragment: hdinsight-kafka-java-get-started
---

# Java-based example of using the Kafka Consumer, Producer, and Streaming APIs
Expand Down Expand Up @@ -39,9 +39,9 @@ The important things to understand in the `pom.xml` file are:
```xml
<!-- Kafka client for producer/consumer operations -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
```

Expand Down Expand Up @@ -90,7 +90,8 @@ properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
```

Notice the important property added for ESP cluster. This is critical to add in AdminClient, Producer and Consumer. properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
Notice the important property added for ESP cluster. This is critical to add in AdminClient, Producer and Consumer.
`properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");`<br>
In this code, the consumer is configured to read from the start of the topic (`auto.offset.reset` is set to `earliest`.)

### Run.java
Expand All @@ -101,16 +102,16 @@ The [Run.java](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started

Download the jars from the [Kafka Get Started Azure sample](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started/tree/master/Prebuilt-Jars). If your cluster is **Enterprise Security Package (ESP)** enabled, use kafka-producer-consumer-esp.jar. Use the command below to copy the jars to your cluster.

```cmd
scp kafka-producer-consumer-esp.jar [email protected]:kafka-producer-consumer.jar
```
```cmd
scp kafka-producer-consumer-esp.jar [email protected]:kafka-producer-consumer.jar
```

## Build the JAR files from code


If you would like to skip this step, prebuilt jars can be downloaded from the `Prebuilt-Jars` subdirectory. Download the kafka-producer-consumer.jar. If your cluster is **Enterprise Security Package (ESP)** enabled, use kafka-producer-consumer-esp.jar. Execute step 3 to copy the jar to your HDInsight cluster.
If you would like to skip this step, prebuilt jars can be downloaded from the `Prebuilt-Jars` subdirectory. Download the `kafka-producer-consumer.jar`. If your cluster is **Enterprise Security Package (ESP)** enabled, use `kafka-producer-consumer-esp.jar`. Execute step 3 to copy the jar to your HDInsight cluster.

1. Download and extract the examples from [https://github.com/Azure-Samples/hdinsight-kafka-java-get-started](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started).
1. Download and extract the examples from [Hdinsight-Kafka-Java](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started).

2. If you are using **Enterprise Security Package (ESP)** enabled Kafka cluster, you should set the location to `DomainJoined-Producer-Consumer` subdirectory. Use the following command to build the application:

Expand Down Expand Up @@ -189,10 +190,10 @@ This conversation was marked as resolved by anusricorp
```

**NOTE:-**
1. espkafkauser should be part of your domain group and add it in RangerUI to give CRUD operations privileges.
2. Keep this domain name (TEST.COM) in capital only. Otherwise, kerberos will throw errors at the time of CRUD operations.
1. `espkafkauser` should be part of your domain group and add it in RangerUI to give CRUD operations privileges.
2. Keep this domain name `(TEST.COM)` in capital letters only. Otherwise, kerberos will throw errors at the time of CRUD operations.

You will be having an espkafkauser.keytab file in local directory. Now create an espkafkauser_jaas.conf jaas config file with data given below
You will be having an `espkafkauser.keytab` file in local directory. Now create an `espkafkauser_jaas.conf` jaas config file with data given below

```
KafkaClient {
Expand Down Expand Up @@ -227,16 +228,21 @@ KafkaClient {
![](media/Kafk_Policy_UI.png)


4. Now edit the alltopic policy and add espkafkauser in selectuser from dropdown. Click on save policy after changes
4. Now edit the `alltopic` policy and add `espkafkauser` in select user from dropdown. Click on save policy after changes

![](media/Edit_Policy_UI.png)

![](media/Add_User.png)

5. If HDI version is `5.1` then edit the `allconsumer` policy and add `espkafkauser` in select user from dropdown. Click on save policy after changes

![](media/Edit_AllConsumerPolicy.png)

![](media/Add_ESPKafkaUser.png)

5. If we are not able to see our user in dropdown then that mean that user is not available in AAD domain.
6. If we are not able to see our user in dropdown then that mean that user is not available in AAD domain.

6. Now Execute CRUD operations in head node for verification
7. Now Execute CRUD operations in head node for verification

```bash
# Sample command
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 13 additions & 2 deletions DomainJoined-Producer-Consumer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
<artifactId>kafka-producer-consumer-esp</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>kafka-producer-consumer</name>
<name>kafka-producer-consumer-esp</name>
<url>http://maven.apache.org</url>
<properties>
<!-- This is the version in HDInsight 4.0 -->
<kafka.version>2.1.1</kafka.version>
<kafka.version>2.1.1</kafka.version>
<slf4j.version>2.0.7</slf4j.version>
</properties>
<dependencies>
<!-- Kafka client for producer/consumer operations -->
Expand All @@ -18,6 +19,16 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
package com.microsoft.example;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.NewTopic;

import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.Collection;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.Properties;
import java.util.Random;
import java.io.IOException;


public class AdminClientWrapper {
private static final Logger logger = LoggerFactory.getLogger(AdminClientWrapper.class);

public static Properties getProperties(String brokers) {
Properties properties = new Properties();
Expand All @@ -42,13 +34,10 @@ public static void describeTopics(String brokers, String topicName) throws IOExc
try (final AdminClient adminClient = KafkaAdminClient.create(properties)) {
// Make async call to describe the topic.
final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName));

TopicDescription description = describeTopicsResult.values().get(topicName).get();
System.out.print(description.toString());
} catch (Exception e) {
System.out.print("Describe denied\n");
System.out.print(e.getMessage());
//throw new RuntimeException(e.getMessage(), e);
logger.info(description.toString());
} catch (Exception exception) {
logger.error("Describe Topic denied: ", exception);
}
}

Expand All @@ -59,11 +48,9 @@ public static void deleteTopics(String brokers, String topicName) throws IOExcep
try (final AdminClient adminClient = KafkaAdminClient.create(properties)) {
final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName));
deleteTopicsResult.values().get(topicName).get();
System.out.print("Topic " + topicName + " deleted");
} catch (Exception e) {
System.out.print("Delete Topics denied\n");
System.out.print(e.getMessage());
//throw new RuntimeException(e.getMessage(), e);
logger.info("Topic " + topicName + " deleted");
} catch (Exception exception) {
logger.error("Delete Topic denied: ", exception);
}
}

Expand All @@ -78,11 +65,9 @@ public static void createTopics(String brokers, String topicName) throws IOExcep

final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
createTopicsResult.values().get(topicName).get();
System.out.print("Topic " + topicName + " created");
} catch (Exception e) {
System.out.print("Create Topics denied\n");
System.out.print(e.getMessage());
//throw new RuntimeException(e.getMessage(), e);
logger.info("Topic " + topicName + " created");
} catch (Exception exception) {
logger.error("Create Topics denied: ", exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
import org.apache.kafka.clients.CommonClientConfigs;
import java.util.Properties;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
public static int consume(String brokers, String groupId, String topicName) {
// Create a consumer
KafkaConsumer<String, String> consumer;
Expand Down Expand Up @@ -45,7 +48,7 @@ public static int consume(String brokers, String groupId, String topicName) {
for(ConsumerRecord<String, String> record: records) {
// Display record and count
count += 1;
System.out.println( count + ": " + record.value());
logger.info(count + ": " + record.value());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.TopicDescription;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.Properties;
import java.util.Random;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Producer
{
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
public static void produce(String brokers, String topicName) throws IOException
{

Expand All @@ -44,22 +46,21 @@ public static void produce(String brokers, String topicName) throws IOException
};

String progressAnimation = "|/-\\";
int numberOfMessages = 100;
// Produce a bunch of records
for(int i = 0; i < 100; i++) {
for(int i = 0; i < numberOfMessages; i++) {
// Pick a sentence at random
String sentence = sentences[random.nextInt(sentences.length)];
// Send the sentence to the test topic
try
{
try {
producer.send(new ProducerRecord<String, String>(topicName, sentence)).get();
}
catch (Exception ex)
{
System.out.print(ex.getMessage());
throw new IOException(ex.toString());
} catch (Exception exception) {
logger.error("Exception while producing messages: ", exception);
throw new IOException(exception.toString());
}
String progressBar = "\r" + progressAnimation.charAt(i % progressAnimation.length()) + " " + i;
System.out.write(progressBar.getBytes());
}
logger.info("Produced Messages: " + numberOfMessages);
}
}
Loading