@@ -49,30 +49,41 @@ public ICollection<string> FetchTopics(IEnumerable<string> topicNames)
4949
5050 var regexTopicNames = topicNames . Select ( Helper . WildcardToRegex ) . ToList ( ) ;
5151
52- try
52+ var allowAutoCreate = true ;
53+ if ( _kafkaOptions . MainConfig . TryGetValue ( "allow.auto.create.topics" , out var autoCreateValue )
54+ && bool . TryParse ( autoCreateValue , out var parsedValue ) )
5355 {
54- var config = new AdminClientConfig ( _kafkaOptions . MainConfig ) { BootstrapServers = _kafkaOptions . Servers } ;
56+ allowAutoCreate = parsedValue ;
57+ }
58+
59+ if ( allowAutoCreate )
60+ {
61+ try
62+ {
63+ var config = new AdminClientConfig ( _kafkaOptions . MainConfig )
64+ { BootstrapServers = _kafkaOptions . Servers } ;
5565
56- using var adminClient = new AdminClientBuilder ( config ) . Build ( ) ;
66+ using var adminClient = new AdminClientBuilder ( config ) . Build ( ) ;
5767
58- adminClient . CreateTopicsAsync ( regexTopicNames . Select ( x => new TopicSpecification
68+ adminClient . CreateTopicsAsync ( regexTopicNames . Select ( x => new TopicSpecification
69+ {
70+ Name = x ,
71+ NumPartitions = _kafkaOptions . TopicOptions . NumPartitions ,
72+ ReplicationFactor = _kafkaOptions . TopicOptions . ReplicationFactor
73+ } ) ) . GetAwaiter ( ) . GetResult ( ) ;
74+ }
75+ catch ( CreateTopicsException ex ) when ( ex . Message . Contains ( "already exists" ) )
5976 {
60- Name = x ,
61- NumPartitions = _kafkaOptions . TopicOptions . NumPartitions ,
62- ReplicationFactor = _kafkaOptions . TopicOptions . ReplicationFactor
63- } ) ) . GetAwaiter ( ) . GetResult ( ) ;
64- }
65- catch ( CreateTopicsException ex ) when ( ex . Message . Contains ( "already exists" ) )
66- {
67- }
68- catch ( Exception ex )
69- {
70- var logArgs = new LogMessageEventArgs
77+ }
78+ catch ( Exception ex )
7179 {
72- LogType = MqLogType . ConsumeError ,
73- Reason = "An error was encountered when automatically creating topic! -->" + ex . Message
74- } ;
75- OnLogCallback ! ( logArgs ) ;
80+ var logArgs = new LogMessageEventArgs
81+ {
82+ LogType = MqLogType . ConsumeError ,
83+ Reason = "An error was encountered when automatically creating topic! -->" + ex . Message
84+ } ;
85+ OnLogCallback ! ( logArgs ) ;
86+ }
7687 }
7788
7889 return regexTopicNames ;
0 commit comments