Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bulkerapp/app/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (bc *BatchConsumerImpl) processFailed(firstPosition *kafka.TopicPartition,
}

}()
err = bc.topicManager.ensureTopic(bc.retryTopic, 1, bc.topicManager.RetryTopicConfig())
err = bc.topicManager.ensureTopic(bc.retryTopic, 1, bc.config.TopicConfig("retry"))
if err != nil {
return counters, fmt.Errorf("failed to create retry topic %s: %v", bc.retryTopic, err)
}
Expand Down
2 changes: 1 addition & 1 deletion bulkerapp/app/stream_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (sc *StreamConsumerImpl) start() {
metrics.ConnectionMessageStatuses(sc.destination.Id(), sc.tableName, "deadLettered").Inc()
failedTopic = sc.config.KafkaDestinationsDeadLetterTopicName
} else {
err = sc.topicManager.ensureTopic(sc.retryTopic, 1, sc.topicManager.RetryTopicConfig())
err = sc.topicManager.ensureTopic(sc.retryTopic, 1, sc.config.TopicConfig("retry"))
if err != nil {
sc.Errorf("failed to create retry topic %s: %v", sc.retryTopic, err)
}
Expand Down
40 changes: 5 additions & 35 deletions bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,7 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics
}
tm.allTopics = allTopics
tm.staleTopics = staleTopics
err := tm.ensureTopic(tm.config.KafkaDestinationsTopicName, tm.config.KafkaDestinationsTopicPartitions,
map[string]string{
"retention.ms": fmt.Sprint(tm.config.KafkaTopicRetentionHours * 60 * 60 * 1000),
"segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000),
})
err := tm.ensureTopic(tm.config.KafkaDestinationsTopicName, tm.config.KafkaDestinationsTopicPartitions, tm.config.TopicConfig("destination"))
if err != nil {
metrics.TopicManagerError("destination-topic_error").Inc()
tm.SystemErrorf("Failed to create destination topic [%s]: %v", tm.config.KafkaDestinationsTopicName, err)
Expand All @@ -352,22 +348,13 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics
metrics.TopicManagerError("destination-topic_error").Inc()
tm.SystemErrorf("Failed to create multi-threaded destination topic [%s]: %v", tm.config.KafkaDestinationsTopicName, err)
}
err = tm.ensureTopic(tm.config.KafkaDestinationsDeadLetterTopicName, 1, map[string]string{
"cleanup.policy": "delete,compact",
"retention.ms": fmt.Sprint(tm.config.KafkaDeadTopicRetentionHours * 60 * 60 * 1000),
"segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000),
})
err = tm.ensureTopic(tm.config.KafkaDestinationsDeadLetterTopicName, 1, tm.config.TopicConfig("dead"))
if err != nil {
metrics.TopicManagerError("destination-topic_error").Inc()
tm.SystemErrorf("Failed to create destination dead letter topic [%s]: %v", tm.config.KafkaDestinationsDeadLetterTopicName, err)
}
destinationsRetryTopicName := tm.config.KafkaDestinationsRetryTopicName
err = tm.ensureTopic(destinationsRetryTopicName, 1, map[string]string{
"cleanup.policy": "delete,compact",
"segment.bytes": fmt.Sprint(tm.config.KafkaRetryTopicSegmentBytes),
"retention.ms": fmt.Sprint(tm.config.KafkaRetryTopicRetentionHours * 60 * 60 * 1000),
"segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000),
})
err = tm.ensureTopic(destinationsRetryTopicName, 1, tm.config.TopicConfig("retry"))
if err != nil {
metrics.TopicManagerError("destination-topic_error").Inc()
tm.SystemErrorf("Failed to create destination retry topic [%s]: %v", destinationsRetryTopicName, err)
Expand Down Expand Up @@ -602,11 +589,7 @@ func (tm *TopicManager) createDestinationTopic(topic string, config map[string]s
errorType = "unknown stream mode"
return tm.NewError("Unknown stream mode: %s for topic: %s", mode, topic)
}
topicConfig := map[string]string{
"retention.ms": fmt.Sprint(tm.config.KafkaTopicRetentionHours * 60 * 60 * 1000),
"segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000),
"compression.type": tm.config.KafkaTopicCompression,
}
topicConfig := tm.config.TopicConfig("destination")
utils.MapPutAll(topicConfig, config)
topicRes, err := tm.kaftaAdminClient.CreateTopics(context.Background(), []kafka.TopicSpecification{
{
Expand Down Expand Up @@ -645,11 +628,7 @@ func (tm *TopicManager) createTopic(topic string, partitions int, config map[str
metrics.TopicManagerCreate(topic, "", "", "", "success", "").Inc()
}
}()
topicConfig := map[string]string{
"compression.type": tm.config.KafkaTopicCompression,
"retention.ms": fmt.Sprint(tm.config.KafkaTopicRetentionHours * 60 * 60 * 1000),
"segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000),
}
topicConfig := tm.config.TopicConfig("destination")
utils.MapPutAll(topicConfig, config)
topicRes, err := tm.kaftaAdminClient.CreateTopics(context.Background(), []kafka.TopicSpecification{
{
Expand Down Expand Up @@ -679,15 +658,6 @@ func (tm *TopicManager) createTopic(topic string, partitions int, config map[str
return nil
}

func (tm *TopicManager) RetryTopicConfig() map[string]string {
return map[string]string{
"cleanup.policy": "delete,compact",
"segment.bytes": fmt.Sprint(tm.config.KafkaRetryTopicSegmentBytes),
"retention.ms": fmt.Sprint(tm.config.KafkaRetryTopicRetentionHours * 60 * 60 * 1000),
"segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000),
}
}

func (tm *TopicManager) Refresh() {
select {
case tm.refreshChan <- true:
Expand Down
29 changes: 29 additions & 0 deletions kafkabase/kafka_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type KafkaConfig struct {
KafkaTopicCompression string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"`
KafkaTopicRetentionHours int `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"48"`
KafkaTopicSegmentHours int `mapstructure:"KAFKA_TOPIC_SEGMENT_HOURS" default:"24"`
KafkaAllowSegmentConfig bool `mapstructure:"KAFKA_ALLOW_SEGMENT_CONFIG" default:"true"`
KafkaTopicPrefix string `mapstructure:"KAFKA_TOPIC_PREFIX" default:""`
KafkaFetchMessageMaxBytes int `mapstructure:"KAFKA_FETCH_MESSAGE_MAX_BYTES" default:"1048576"`

Expand Down Expand Up @@ -94,6 +95,34 @@ func (ac *KafkaConfig) GetKafkaConfig() *kafka.ConfigMap {
return kafkaConfig
}

func (c *KafkaConfig) TopicConfig(mode string) map[string]string {
config := map[string]string{}
config["compression.type"] = c.KafkaTopicCompression

switch mode {
case "retry":
config["retention.ms"] = fmt.Sprint(c.KafkaRetryTopicRetentionHours * 60 * 60 * 1000)
config["cleanup.policy"] = "delete,compact"

if c.KafkaAllowSegmentConfig {
config["segment.bytes"] = fmt.Sprint(c.KafkaRetryTopicSegmentBytes)
config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000)
}
case "dead":
config["retention.ms"] = fmt.Sprint(c.KafkaDeadTopicRetentionHours * 60 * 60 * 1000)
config["cleanup.policy"] = "delete,compact"
if c.KafkaAllowSegmentConfig {
config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000)
}
default:
config["retention.ms"] = fmt.Sprint(c.KafkaTopicRetentionHours * 60 * 60 * 1000)
if c.KafkaAllowSegmentConfig {
config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000)
}
}
return config
}

func (c *KafkaConfig) PostInit(settings *appbase.AppSettings) error {
return nil
}