From c8d2b44370684fdfdf55d0b2d2d9eec666e37725 Mon Sep 17 00:00:00 2001 From: tsuz <6927131+tsuz@users.noreply.github.com> Date: Sun, 24 May 2026 16:03:19 +0900 Subject: [PATCH] replication factor shoudl be 3 by default --- .../java/io/flightdeck/streams/FlightDeckStreamsApp.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java b/processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java index 836ed8a..c073f11 100644 --- a/processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java +++ b/processor-apps/processing/src/main/java/io/flightdeck/streams/FlightDeckStreamsApp.java @@ -149,6 +149,11 @@ private static void ensureTopicsExist(Properties streamsProps) { KafkaEnvProps.apply(adminProps); adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + int partitions = Integer.parseInt( + System.getenv().getOrDefault("KAFKA_TOPIC_PARTITIONS", "1")); + short replicationFactor = Short.parseShort( + System.getenv().getOrDefault("KAFKA_TOPIC_REPLICATION_FACTOR", "3")); + List requiredTopics = new java.util.ArrayList<>(List.of( Topics.MESSAGE_INPUT, Topics.ENRICHED_MESSAGE_INPUT, @@ -174,7 +179,7 @@ private static void ensureTopicsExist(Properties streamsProps) { List toCreate = requiredTopics.stream() .filter(t -> !existing.contains(t)) - .map(t -> new NewTopic(t, 1, (short) 1)) + .map(t -> new NewTopic(t, partitions, replicationFactor)) .collect(Collectors.toList()); if (!toCreate.isEmpty()) {