From 96287930135215e838236e15f0e820f3ebe78537 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Mon, 3 Nov 2025 22:47:13 +0000 Subject: [PATCH 1/4] build(deps): Switch to upstream librdkafka Using https://github.com/MaterializeInc/rust-rdkafka/pull/35 --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db8914b0b2dd7..5e6262b419876 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10058,7 +10058,7 @@ dependencies = [ [[package]] name = "rdkafka" version = "0.29.0" -source = "git+https://github.com/MaterializeInc/rust-rdkafka.git#c8d7ae601896af0efe9d737dfb0f84bea25b8518" +source = "git+https://github.com/MaterializeInc/rust-rdkafka.git#642daa6e293065ec653c04f2d1147a7675ce2430" dependencies = [ "futures-channel", "futures-util", @@ -10075,7 +10075,7 @@ dependencies = [ [[package]] name = "rdkafka-sys" version = "4.3.0+2.5.0" -source = "git+https://github.com/MaterializeInc/rust-rdkafka.git#c8d7ae601896af0efe9d737dfb0f84bea25b8518" +source = "git+https://github.com/MaterializeInc/rust-rdkafka.git#642daa6e293065ec653c04f2d1147a7675ce2430" dependencies = [ "cmake", "libc", From 8c3c1842e11a7b650839cc2f7206fa2a3cebdb5f Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Tue, 4 Nov 2025 09:12:03 +0000 Subject: [PATCH 2/4] Adapt kafka-auth test to new librdkafka version --- test/kafka-auth/test-kafka-sasl-ssl.td | 2 +- test/kafka-auth/test-kafka-ssl.td | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/kafka-auth/test-kafka-sasl-ssl.td b/test/kafka-auth/test-kafka-sasl-ssl.td index 2834ee5aeb94c..5db3c97a1ec56 100644 --- a/test/kafka-auth/test-kafka-sasl-ssl.td +++ b/test/kafka-auth/test-kafka-sasl-ssl.td @@ -28,7 +28,7 @@ banana SASL PASSWORD SECRET password, SECURITY PROTOCOL SASL_PLAINTEXT ) -contains:Disconnected during handshake; broker might require SSL encryption +contains:Disconnected: connection closed by peer: receive 0 after POLLIN ! CREATE CONNECTION kafka_invalid TO KAFKA ( BROKER 'kafka:9096', diff --git a/test/kafka-auth/test-kafka-ssl.td b/test/kafka-auth/test-kafka-ssl.td index 5aa509339521c..c3a7442679c29 100644 --- a/test/kafka-auth/test-kafka-ssl.td +++ b/test/kafka-auth/test-kafka-ssl.td @@ -26,7 +26,7 @@ banana BROKER 'kafka:9093', SECURITY PROTOCOL PLAINTEXT ) -contains:Disconnected during handshake; broker might require SSL encryption +contains:Disconnected: connection closed by peer: receive 0 after POLLIN ! CREATE CONNECTION kafka_invalid TO KAFKA ( BROKER 'kafka:9093' @@ -44,7 +44,7 @@ contains:Invalid CA certificate BROKER 'kafka:9093', SSL CERTIFICATE AUTHORITY = 'this is garbage' ) -contains:ssl.ca.pem failed: not in PEM format? +contains:failed to read certificate #0 from ssl.ca.pem: not in PEM format?: # ==> Test without an SSH tunnel. <== From 48d8696032c0597a617c13a1e9603181c407bcd6 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Tue, 4 Nov 2025 09:35:21 +0000 Subject: [PATCH 3/4] Adapt source/sink error test to new librdkafka version --- test/source-sink-errors/mzcompose.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/source-sink-errors/mzcompose.py b/test/source-sink-errors/mzcompose.py index aa4d56fbe078f..4c1e5b631320b 100644 --- a/test/source-sink-errors/mzcompose.py +++ b/test/source-sink-errors/mzcompose.py @@ -331,7 +331,8 @@ def assert_error(self, c: Composition, error: str) -> None: c.testdrive( dedent( f""" - $ set-sql-timeout duration=60s + # Takes > 60s following librdkafka update + $ set-sql-timeout duration=240s # Sinks generally halt after receiving an error, which means that they may alternate # between `stalled` and `starting`. Instead of relying on the current status, we # check that there is a stalled status with the expected error. From 46e7863dbc5667e429bd6db7ab5e70958a4ade35 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Wed, 5 Nov 2025 22:22:51 +0000 Subject: [PATCH 4/4] DNM: Failed to set for sinks only --- src/kafka-util/src/client.rs | 14 ++++++++++++++ src/storage-client/src/sink.rs | 8 +++++++- test/source-sink-errors/mzcompose.py | 3 +-- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index 0181af23ae8ef..76e042785f417 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -744,6 +744,8 @@ pub const DEFAULT_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(10); /// The timeout for reading records from the progress topic. Set to something slightly longer than /// the idle transaction timeout (60s) to wait out any stuck producers. pub const DEFAULT_PROGRESS_RECORD_FETCH_TIMEOUT: Duration = Duration::from_secs(90); +/// Delays marking a topic as non-existent until configured time has passed. +pub const DEFAULT_TOPIC_METADATA_PROPAGATION_MAX: Duration = Duration::from_secs(30); /// Configurable timeouts for Kafka connections. #[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -761,6 +763,8 @@ pub struct TimeoutConfig { pub fetch_metadata_timeout: Duration, /// The timeout for reading records from the progress topic. pub progress_record_fetch_timeout: Duration, + /// Delays marking a topic as non-existent until configured time has passed. + pub topic_metadata_propagation_max: Duration, } impl Default for TimeoutConfig { @@ -772,6 +776,7 @@ impl Default for TimeoutConfig { socket_connection_setup_timeout: DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT, fetch_metadata_timeout: DEFAULT_FETCH_METADATA_TIMEOUT, progress_record_fetch_timeout: DEFAULT_PROGRESS_RECORD_FETCH_TIMEOUT, + topic_metadata_propagation_max: DEFAULT_TOPIC_METADATA_PROPAGATION_MAX, } } } @@ -879,6 +884,7 @@ impl TimeoutConfig { socket_connection_setup_timeout, fetch_metadata_timeout, progress_record_fetch_timeout, + topic_metadata_propagation_max: DEFAULT_TOPIC_METADATA_PROPAGATION_MAX, } } } @@ -956,5 +962,13 @@ pub fn create_new_client_config( .to_string(), ); + config.set( + "topic.metadata.propagation.max.ms", + timeout_config + .topic_metadata_propagation_max + .as_millis() + .to_string(), + ); + config } diff --git a/src/storage-client/src/sink.rs b/src/storage-client/src/sink.rs index 5a6ce25d63a51..6b7e12e2b8423 100644 --- a/src/storage-client/src/sink.rs +++ b/src/storage-client/src/sink.rs @@ -162,10 +162,16 @@ pub async fn ensure_kafka_topic( }: &KafkaTopicOptions, ensure_topic_config: EnsureTopicConfig, ) -> Result { + let mut storage_configuration = storage_configuration.clone(); + // With recent librdkafka the topic metadata propagation for a sink error takes longer + storage_configuration + .parameters + .kafka_timeout_config + .topic_metadata_propagation_max = Duration::from_secs(10); let client: AdminClient<_> = connection .connection .create_with_context( - storage_configuration, + &storage_configuration, MzClientContext::default(), &BTreeMap::new(), // Only called from `mz_storage`. diff --git a/test/source-sink-errors/mzcompose.py b/test/source-sink-errors/mzcompose.py index 4c1e5b631320b..aa4d56fbe078f 100644 --- a/test/source-sink-errors/mzcompose.py +++ b/test/source-sink-errors/mzcompose.py @@ -331,8 +331,7 @@ def assert_error(self, c: Composition, error: str) -> None: c.testdrive( dedent( f""" - # Takes > 60s following librdkafka update - $ set-sql-timeout duration=240s + $ set-sql-timeout duration=60s # Sinks generally halt after receiving an error, which means that they may alternate # between `stalled` and `starting`. Instead of relying on the current status, we # check that there is a stalled status with the expected error.