Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions src/kafka-util/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,8 @@ pub const DEFAULT_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(10);
/// The timeout for reading records from the progress topic. Set to something slightly longer than
/// the idle transaction timeout (60s) to wait out any stuck producers.
pub const DEFAULT_PROGRESS_RECORD_FETCH_TIMEOUT: Duration = Duration::from_secs(90);
/// Delays marking a topic as non-existent until configured time has passed.
pub const DEFAULT_TOPIC_METADATA_PROPAGATION_MAX: Duration = Duration::from_secs(30);

/// Configurable timeouts for Kafka connections.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
Expand All @@ -761,6 +763,8 @@ pub struct TimeoutConfig {
pub fetch_metadata_timeout: Duration,
/// The timeout for reading records from the progress topic.
pub progress_record_fetch_timeout: Duration,
/// Delays marking a topic as non-existent until configured time has passed.
pub topic_metadata_propagation_max: Duration,
}

impl Default for TimeoutConfig {
Expand All @@ -772,6 +776,7 @@ impl Default for TimeoutConfig {
socket_connection_setup_timeout: DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT,
fetch_metadata_timeout: DEFAULT_FETCH_METADATA_TIMEOUT,
progress_record_fetch_timeout: DEFAULT_PROGRESS_RECORD_FETCH_TIMEOUT,
topic_metadata_propagation_max: DEFAULT_TOPIC_METADATA_PROPAGATION_MAX,
}
}
}
Expand Down Expand Up @@ -879,6 +884,7 @@ impl TimeoutConfig {
socket_connection_setup_timeout,
fetch_metadata_timeout,
progress_record_fetch_timeout,
topic_metadata_propagation_max: DEFAULT_TOPIC_METADATA_PROPAGATION_MAX,
}
}
}
Expand Down Expand Up @@ -956,5 +962,13 @@ pub fn create_new_client_config(
.to_string(),
);

config.set(
"topic.metadata.propagation.max.ms",
timeout_config
.topic_metadata_propagation_max
.as_millis()
.to_string(),
);

config
}
8 changes: 7 additions & 1 deletion src/storage-client/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,16 @@ pub async fn ensure_kafka_topic(
}: &KafkaTopicOptions,
ensure_topic_config: EnsureTopicConfig,
) -> Result<bool, anyhow::Error> {
let mut storage_configuration = storage_configuration.clone();
// With recent librdkafka the topic metadata propagation for a sink error takes longer
storage_configuration
.parameters
.kafka_timeout_config
.topic_metadata_propagation_max = Duration::from_secs(10);
let client: AdminClient<_> = connection
.connection
.create_with_context(
storage_configuration,
&storage_configuration,
MzClientContext::default(),
&BTreeMap::new(),
// Only called from `mz_storage`.
Expand Down
2 changes: 1 addition & 1 deletion test/kafka-auth/test-kafka-sasl-ssl.td
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ banana
SASL PASSWORD SECRET password,
SECURITY PROTOCOL SASL_PLAINTEXT
)
contains:Disconnected during handshake; broker might require SSL encryption
contains:Disconnected: connection closed by peer: receive 0 after POLLIN

! CREATE CONNECTION kafka_invalid TO KAFKA (
BROKER 'kafka:9096',
Expand Down
4 changes: 2 additions & 2 deletions test/kafka-auth/test-kafka-ssl.td
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ banana
BROKER 'kafka:9093',
SECURITY PROTOCOL PLAINTEXT
)
contains:Disconnected during handshake; broker might require SSL encryption
contains:Disconnected: connection closed by peer: receive 0 after POLLIN

! CREATE CONNECTION kafka_invalid TO KAFKA (
BROKER 'kafka:9093'
Expand All @@ -44,7 +44,7 @@ contains:Invalid CA certificate
BROKER 'kafka:9093',
SSL CERTIFICATE AUTHORITY = 'this is garbage'
)
contains:ssl.ca.pem failed: not in PEM format?
contains:failed to read certificate #0 from ssl.ca.pem: not in PEM format?:

# ==> Test without an SSH tunnel. <==

Expand Down