Skip to content

Conversation

@shroffk
Copy link
Member

@shroffk shroffk commented Nov 14, 2025

Create a new compose file to deploy a kafka 4 cluster

Checklist

  • Testing:

    • The feature has automated tests
    • Tests were run
    • If not, explain how you tested your changes
  • Documentation:

    • The feature is documented
    • The documentation is up to date
    • Release notes:
      • Added an entry if the change is breaking or significant
      • Added an entry when adding a new feature

@shroffk shroffk self-assigned this Nov 14, 2025
@shroffk shroffk added the wip work in progress label Nov 14, 2025
@kasemir
Copy link
Collaborator

kasemir commented Nov 14, 2025

When using a cluster, make sure that each alarm client receives all messages!

Based on the cluster setup, messages could be spread across partitions. Each message would get to a consumer, but not all consumers get all messages. So clients would see only some of the alarm updates.
https://medium.com/@martin.hodges/starting-out-with-kafka-clusters-topics-partitions-and-brokers-c9fbe4ed1642

To get all messages, the KafkaHelper creates each consumer with its own unique "group.id"
https://github.com/ControlSystemStudio/phoebus/blame/master/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java

// Each alarm client must receive all updates,
// cannot balance updates across a group
// --> Use unique group for each client
final String group_id = "Alarm-" + UUID.randomUUID();

.. but an update 3 years ago to add a kafka_properties preference, b7021d6, changed that into

if (!kafka_props.containsKey("group.id")){
            // API requires for Consumer to be in a group.
            // Each alarm client must receive all updates,
            // cannot balance updates across a group
            // --> Use unique group for each client
            final String group_id = "Alarm-" + UUID.randomUUID();
            kafka_props.put("group.id", group_id);
}

If you put several alarm clients in to the same "group.id", the alarm updates will be distributed across your alarm clients. With N alarm clients, each client will only see 1/Nth of the updates and then you wonder why each client shows a different alarm state. But yes, it will be more effective, because each alarm client only needs to handle 1/Nth of the messages.

@georgweiss
Copy link
Collaborator

@kasemir, not sure I understand your comment... Are you saying that a group.id should not be specified in the properties if we run a cluster. I.e. each client should start with a random id?

@kasemir
Copy link
Collaborator

kasemir commented Nov 14, 2025

If all fails, check the documentation.

Screenshot 2025-11-14 at 11 51 24 AM

When you have a cluster, and data it spread across partitions, each update is sent to one consumer in a group. That's great is a message represents a task, and one of the consumers needs to handle it. You can place many consumers in a group, and the work is spread across the consumers in the group.

But for alarms, we need EVERY alarm client (=consumer) to get ALL the alarm messages. Otherwise you wonder why your cluster efficiently only sends 1/Nth of the updates to each of the N alarm clients.

So, set a unique group.id for each consumer, don't put that into the common properties.

@kasemir
Copy link
Collaborator

kasemir commented Nov 14, 2025

See also first rule of clustering: Simply enabling "the cluster" is more likely to add problems than increase reliability.

@georgweiss
Copy link
Collaborator

@kasemir, thanks for acting documentation.
To confirm: since KafkaHelper will assign a random group.id if not set in properties, using a cluster should not be an issue.

@kasemir
Copy link
Collaborator

kasemir commented Nov 14, 2025

Yes, KafkaHelper defaults not only to a random but globally unique group.id.
That has been tested with a single Kafka instance, alarm server, and multiple clients. As long as each client has a unique ID, all clients get all messages from a single Kafka instance.

But it has only been tested with a single Kafka instance. If there's a cluster, I'm not sure that the unique group ID is all that we necessary to assert that all clients get all messages. So please check that, because Kafka does have operating modes to support the task idea where one client and only one client needs to handle a message. We don't want to accidentally end up in that mode.

@shroffk
Copy link
Member Author

shroffk commented Nov 14, 2025

Yes, clusters can be a pain... I have managed to avoid them in the past but it turns out that a large number of facilities are using a 3 node kafka setup ( I believe nsls2 and eic are moving to the same setup)

So...I think the key here is that based on our use case... we should ensure that each client gets a unique group.id

I would be fine with updating the kafka helper such that it would always set a globally unique group.id. If a group.id is specified in the the kafka.properties then there is a warning log message and it is ignored

Additionally, I have added this compose file since it is what I have been using to test the alarm services with kafka 4 ( or 8 in the confluence versioning scheme )

ensure unique group.id
remove the use of deprecated kafka streams interfaces
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR updates Kafka infrastructure and client code to support newer Kafka versions. It introduces a new Docker Compose configuration for a 3-node Kafka cluster and migrates from deprecated Kafka Streams API methods to their modern equivalents.

  • Creates a new docker-compose file for deploying a 3-node Kafka cluster using KRaft mode
  • Migrates Kafka Streams branching logic from deprecated branch() API to split() API with Branched consumers
  • Simplifies consumer group ID generation by removing conditional logic

Reviewed Changes

Copilot reviewed 3 out of 4 changed files in this pull request and generated 1 comment.

File Description
services/alarm-server/src/test/resources/docker/docker-compose-kafka-cluster.yml New multi-node Kafka cluster configuration using KRaft mode with Confluent Platform 8.1.0
services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java Migrates from deprecated branch() to split() API, adds UUID import, removes TimeUnit import, refactors method signatures
app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java Simplifies consumer group ID logic by always generating unique UUIDs instead of conditional checking
.gitignore Adds broader *.pyc pattern to ignore Python compiled files

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@shroffk shroffk removed the wip work in progress label Nov 20, 2025
@shroffk shroffk merged commit 0cadf49 into master Nov 20, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

4 participants