diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 384a6eeccd2ca..37f74f0189d9f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -471,11 +471,6 @@ public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler us } processStreamThread(thread -> thread.setUncaughtExceptionHandler((t, e) -> { } )); - - if (globalStreamThread != null) { - globalStreamThread.setUncaughtExceptionHandler((t, e) -> { } - ); - } } else { throw new IllegalStateException("Can only set UncaughtExceptionHandler before calling start(). " + "Current state is: " + state); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 173ebdb8d4b44..1025adc674480 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -239,7 +239,7 @@ static class StateConsumer { } /** - * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws IllegalStateException If a store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ void initialize() { @@ -431,7 +431,7 @@ private StateConsumer initialize() { } catch (final StreamsException fatalException) { closeStateConsumer(stateConsumer, false); startupException = fatalException; - } catch (final Exception fatalException) { + } catch (final Throwable fatalException) { closeStateConsumer(stateConsumer, false); startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", fatalException); } finally { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index 05525ca439dd1..ff428828e05b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -124,12 +124,7 @@ public void process(final Record record) { ); baseDirectoryName = TestUtils.tempDirectory().getAbsolutePath(); - final HashMap properties = new HashMap<>(); - properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah"); - properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "testAppId"); - properties.put(StreamsConfig.STATE_DIR_CONFIG, baseDirectoryName); - properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()); - properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()); + final HashMap properties = getStreamProperties(); config = new StreamsConfig(properties); globalStreamThread = new GlobalStreamThread( builder.rewriteTopology(config).buildGlobalStateTopology(), @@ -407,6 +402,51 @@ public void shouldTimeOutOnGlobalConsumerInstanceId() throws Exception { } } + @Test + public void shouldThrowStreamsExceptionOnStartupIfThrowableOccurred() throws Exception { + final String exceptionMessage = "Throwable occurred!"; + final MockConsumer consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { + @Override + public List partitionsFor(final String topic) { + throw new ExceptionInInitializerError(exceptionMessage); + } + }; + final StateStore globalStore = builder.globalStateStores().get(GLOBAL_STORE_NAME); + globalStreamThread = new GlobalStreamThread( + builder.buildGlobalStateTopology(), + config, + consumer, + new StateDirectory(config, time, true, false), + 0, + new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time), + time, + "clientId", + stateRestoreListener, + e -> { } + ); + + try { + globalStreamThread.start(); + fail("Should have thrown StreamsException if start up failed"); + } catch (final StreamsException e) { + assertThat(e.getCause(), instanceOf(Throwable.class)); + assertThat(e.getCause().getMessage(), equalTo(exceptionMessage)); + } + globalStreamThread.join(); + assertThat(globalStore.isOpen(), is(false)); + assertFalse(globalStreamThread.stillRunning()); + } + + private HashMap getStreamProperties() { + final HashMap properties = new HashMap<>(); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah"); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "testAppId"); + properties.put(StreamsConfig.STATE_DIR_CONFIG, baseDirectoryName); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()); + return properties; + } + private void initializeConsumer() { mockConsumer.updatePartitions( GLOBAL_STORE_TOPIC_NAME,