diff --git a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs index 0790458..18d2b13 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs @@ -10,6 +10,9 @@ namespace DotNetCore.CAP /// public class KafkaOptions { + private IEnumerable> _kafkaConfig; + + public KafkaOptions() { MainConfig = new Dictionary(); @@ -33,23 +36,21 @@ namespace DotNetCore.CAP internal IEnumerable> AskafkaConfig() { - if (MainConfig.ContainsKey("bootstrap.servers")) - { - return MainConfig.AsEnumerable(); - } - - if (string.IsNullOrWhiteSpace(Servers)) + if (_kafkaConfig == null) { - throw new ArgumentNullException(nameof(Servers)); - } + if (string.IsNullOrWhiteSpace(Servers)) + { + throw new ArgumentNullException(nameof(Servers)); + } - MainConfig.Add("bootstrap.servers", Servers); + MainConfig["bootstrap.servers"] = Servers; + MainConfig["queue.buffering.max.ms"] = "10"; + MainConfig["socket.blocking.max.ms"] = "10"; + MainConfig["enable.auto.commit"] = "false"; - MainConfig["queue.buffering.max.ms"] = "10"; - MainConfig["socket.blocking.max.ms"] = "10"; - MainConfig["enable.auto.commit"] = "false"; - - return MainConfig.AsEnumerable(); + _kafkaConfig = MainConfig.AsEnumerable(); + } + return _kafkaConfig; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index 1b65188..e06c77f 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -63,7 +63,7 @@ namespace DotNetCore.CAP.Kafka private void InitKafkaClient() { - _kafkaOptions.MainConfig.Add("group.id", _groupId); + _kafkaOptions.MainConfig["group.id"] = _groupId; var config = _kafkaOptions.AskafkaConfig(); _consumerClient = new Consumer(config, null, StringDeserializer);