Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,4 @@ docs/source/_ext/**/*.pyc

# Ignore pip installation metadata
docs/phoebus_docs.egg-info/
*.pyc
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,12 @@ public static Consumer<String, String> connectConsumer(final String kafka_server
Properties kafka_props = loadPropsFromFile(properties_file);
kafka_props.put("bootstrap.servers", kafka_servers);

if (!kafka_props.containsKey("group.id")){
// API requires for Consumer to be in a group.
// Each alarm client must receive all updates,
// cannot balance updates across a group
// --> Use unique group for each client
final String group_id = "Alarm-" + UUID.randomUUID();
kafka_props.put("group.id", group_id);
}
// API requires for Consumer to be in a group.
// Each alarm client must receive all updates,
// cannot balance updates across a group
// --> Use unique group for each client
final String group_id = "Alarm-" + UUID.randomUUID();
kafka_props.put("group.id", group_id);

logger.fine(kafka_props.getProperty("group.id") + " subscribes to " + kafka_servers + " for " + topics);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -19,12 +19,15 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.phoebus.applications.alarm.AlarmSystemConstants;
import org.phoebus.applications.alarm.client.KafkaHelper;
import org.phoebus.applications.alarm.messages.AlarmConfigMessage;
import org.phoebus.applications.alarm.messages.AlarmMessage;
Expand Down Expand Up @@ -71,14 +74,19 @@ public void run() {
Properties kafkaProps = KafkaHelper.loadPropsFromFile(props.getProperty("kafka_properties",""));
kafkaProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-"+topic+"-alarm-messages");

if (props.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)){
kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
props.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
} else {
kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
}


kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
props.getOrDefault(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"));

// API requires for Consumer to be in a group.
// Each alarm client must receive all updates,
// cannot balance updates across a group
// --> Use unique group for each client
final String group_id = "Alarm-" + UUID.randomUUID();
kafkaProps.put("group.id", group_id);

AlarmSystemConstants.logger.fine(kafkaProps.getProperty("group.id") + " subscribes to "
+ kafkaProps.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG) + " for " + topic);

final String indexDateSpanUnits = props.getProperty("date_span_units");
final boolean useDatedIndexNames = Boolean.parseBoolean(props.getProperty("use_dated_index_names"));

Expand Down Expand Up @@ -112,14 +120,15 @@ public long extract(ConsumerRecord<Object, Object> record, long previousTimestam
return new KeyValue<String, AlarmMessage>(key, value);
});

@SuppressWarnings("unchecked")
KStream<String, AlarmMessage>[] alarmBranches = alarms.branch((k,v) -> k.startsWith("state"),
(k,v) -> k.startsWith("config"),
(k,v) -> false
);

processAlarmStateStream(alarmBranches[0], props);
processAlarmConfigurationStream(alarmBranches[1], props);
alarms.split(Named.as("alarm-"))
.branch((k, v) -> k.startsWith("state"),
Branched.withConsumer(alarmStateStream -> processAlarmStateStream(alarmStateStream)))
.branch((k, v) -> k.startsWith("config"),
Branched.withConsumer(alarmConfigStream -> processAlarmConfigurationStream(alarmConfigStream)))
.defaultBranch(Branched.withConsumer(stream -> {
// Log each unmatched key in the default branch
stream.foreach((k, v) -> logger.warning("Unknown alarm message type for key: " + k));
}));

final KafkaStreams streams = new KafkaStreams(builder.build(), kafkaProps);
final CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -143,7 +152,7 @@ public void run() {
System.exit(0);
}

private void processAlarmStateStream(KStream<String, AlarmMessage> alarmStateBranch, Properties props) {
private void processAlarmStateStream(KStream<String, AlarmMessage> alarmStateBranch) {

KStream<String, AlarmStateMessage> transformedAlarms = alarmStateBranch
.transform(new TransformerSupplier<String, AlarmMessage, KeyValue<String, AlarmStateMessage>>() {
Expand Down Expand Up @@ -193,7 +202,7 @@ public void close() {

}

private void processAlarmConfigurationStream(KStream<String, AlarmMessage> alarmConfigBranch, Properties props) {
private void processAlarmConfigurationStream(KStream<String, AlarmMessage> alarmConfigBranch) {
KStream<String, AlarmConfigMessage> alarmConfigMessages = alarmConfigBranch.transform(new TransformerSupplier<String, AlarmMessage, KeyValue<String,AlarmConfigMessage>>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
version: '2.2'

services:
kafka1:
image: confluentinc/cp-kafka:8.1.0
container_name: kafka1
ports:
- "9192:9192"
environment:
CLUSTER_ID: 'b1a2c3d4e5f6g7h8i9j0klmn'
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9193,2@kafka2:9193,3@kafka3:9193
KAFKA_LISTENERS: PLAINTEXT://:9192,CONTROLLER://:9193
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9192
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
volumes:
- kafka1_data:/var/lib/kafka/data

kafka2:
image: confluentinc/cp-kafka:8.1.0
container_name: kafka2
ports:
- "9292:9292"
environment:
CLUSTER_ID: 'b1a2c3d4e5f6g7h8i9j0klmn'
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9193,2@kafka2:9193,3@kafka3:9193
KAFKA_LISTENERS: PLAINTEXT://:9292,CONTROLLER://:9193
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9292
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
volumes:
- kafka2_data:/var/lib/kafka/data

kafka3:
image: confluentinc/cp-kafka:8.1.0
container_name: kafka3
ports:
- "9392:9392"
environment:
CLUSTER_ID: 'b1a2c3d4e5f6g7h8i9j0klmn'
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9193,2@kafka2:9193,3@kafka3:9193
KAFKA_LISTENERS: PLAINTEXT://:9392,CONTROLLER://:9193
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9392
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
volumes:
- kafka3_data:/var/lib/kafka/data

volumes:
kafka1_data:
kafka2_data:
kafka3_data: