diff --git a/Cargo.lock b/Cargo.lock index db8914b0b2dd7..5e6262b419876 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10058,7 +10058,7 @@ dependencies = [ [[package]] name = "rdkafka" version = "0.29.0" -source = "git+https://github.com/MaterializeInc/rust-rdkafka.git#c8d7ae601896af0efe9d737dfb0f84bea25b8518" +source = "git+https://github.com/MaterializeInc/rust-rdkafka.git#642daa6e293065ec653c04f2d1147a7675ce2430" dependencies = [ "futures-channel", "futures-util", @@ -10075,7 +10075,7 @@ dependencies = [ [[package]] name = "rdkafka-sys" version = "4.3.0+2.5.0" -source = "git+https://github.com/MaterializeInc/rust-rdkafka.git#c8d7ae601896af0efe9d737dfb0f84bea25b8518" +source = "git+https://github.com/MaterializeInc/rust-rdkafka.git#642daa6e293065ec653c04f2d1147a7675ce2430" dependencies = [ "cmake", "libc", diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index 0181af23ae8ef..76e042785f417 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -744,6 +744,8 @@ pub const DEFAULT_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(10); /// The timeout for reading records from the progress topic. Set to something slightly longer than /// the idle transaction timeout (60s) to wait out any stuck producers. pub const DEFAULT_PROGRESS_RECORD_FETCH_TIMEOUT: Duration = Duration::from_secs(90); +/// Delays marking a topic as non-existent until configured time has passed. +pub const DEFAULT_TOPIC_METADATA_PROPAGATION_MAX: Duration = Duration::from_secs(30); /// Configurable timeouts for Kafka connections. #[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -761,6 +763,8 @@ pub struct TimeoutConfig { pub fetch_metadata_timeout: Duration, /// The timeout for reading records from the progress topic. pub progress_record_fetch_timeout: Duration, + /// Delays marking a topic as non-existent until configured time has passed. + pub topic_metadata_propagation_max: Duration, } impl Default for TimeoutConfig { @@ -772,6 +776,7 @@ impl Default for TimeoutConfig { socket_connection_setup_timeout: DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT, fetch_metadata_timeout: DEFAULT_FETCH_METADATA_TIMEOUT, progress_record_fetch_timeout: DEFAULT_PROGRESS_RECORD_FETCH_TIMEOUT, + topic_metadata_propagation_max: DEFAULT_TOPIC_METADATA_PROPAGATION_MAX, } } } @@ -879,6 +884,7 @@ impl TimeoutConfig { socket_connection_setup_timeout, fetch_metadata_timeout, progress_record_fetch_timeout, + topic_metadata_propagation_max: DEFAULT_TOPIC_METADATA_PROPAGATION_MAX, } } } @@ -956,5 +962,13 @@ pub fn create_new_client_config( .to_string(), ); + config.set( + "topic.metadata.propagation.max.ms", + timeout_config + .topic_metadata_propagation_max + .as_millis() + .to_string(), + ); + config } diff --git a/src/storage-client/src/sink.rs b/src/storage-client/src/sink.rs index 5a6ce25d63a51..6b7e12e2b8423 100644 --- a/src/storage-client/src/sink.rs +++ b/src/storage-client/src/sink.rs @@ -162,10 +162,16 @@ pub async fn ensure_kafka_topic( }: &KafkaTopicOptions, ensure_topic_config: EnsureTopicConfig, ) -> Result { + let mut storage_configuration = storage_configuration.clone(); + // With recent librdkafka the topic metadata propagation for a sink error takes longer + storage_configuration + .parameters + .kafka_timeout_config + .topic_metadata_propagation_max = Duration::from_secs(10); let client: AdminClient<_> = connection .connection .create_with_context( - storage_configuration, + &storage_configuration, MzClientContext::default(), &BTreeMap::new(), // Only called from `mz_storage`. diff --git a/test/kafka-auth/test-kafka-sasl-ssl.td b/test/kafka-auth/test-kafka-sasl-ssl.td index 2834ee5aeb94c..5db3c97a1ec56 100644 --- a/test/kafka-auth/test-kafka-sasl-ssl.td +++ b/test/kafka-auth/test-kafka-sasl-ssl.td @@ -28,7 +28,7 @@ banana SASL PASSWORD SECRET password, SECURITY PROTOCOL SASL_PLAINTEXT ) -contains:Disconnected during handshake; broker might require SSL encryption +contains:Disconnected: connection closed by peer: receive 0 after POLLIN ! CREATE CONNECTION kafka_invalid TO KAFKA ( BROKER 'kafka:9096', diff --git a/test/kafka-auth/test-kafka-ssl.td b/test/kafka-auth/test-kafka-ssl.td index 5aa509339521c..c3a7442679c29 100644 --- a/test/kafka-auth/test-kafka-ssl.td +++ b/test/kafka-auth/test-kafka-ssl.td @@ -26,7 +26,7 @@ banana BROKER 'kafka:9093', SECURITY PROTOCOL PLAINTEXT ) -contains:Disconnected during handshake; broker might require SSL encryption +contains:Disconnected: connection closed by peer: receive 0 after POLLIN ! CREATE CONNECTION kafka_invalid TO KAFKA ( BROKER 'kafka:9093' @@ -44,7 +44,7 @@ contains:Invalid CA certificate BROKER 'kafka:9093', SSL CERTIFICATE AUTHORITY = 'this is garbage' ) -contains:ssl.ca.pem failed: not in PEM format? +contains:failed to read certificate #0 from ssl.ca.pem: not in PEM format?: # ==> Test without an SSH tunnel. <==