From 93804119b492228c1bf79545590677a173c401bc Mon Sep 17 00:00:00 2001 From: Savorboard Date: Fri, 2 Mar 2018 14:41:15 +0800 Subject: [PATCH] Fixed thread safety issue about KafkaOptions. #89 --- src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index 2affaa6..241870e 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -66,16 +66,18 @@ namespace DotNetCore.CAP.Kafka private void InitKafkaClient() { - _kafkaOptions.MainConfig["group.id"] = _groupId; + lock (_kafkaOptions) + { + _kafkaOptions.MainConfig["group.id"] = _groupId; - var config = _kafkaOptions.AsKafkaConfig(); - _consumerClient = new Consumer(config, null, StringDeserializer); - _consumerClient.OnConsumeError += ConsumerClient_OnConsumeError; - _consumerClient.OnMessage += ConsumerClient_OnMessage; - _consumerClient.OnError += ConsumerClient_OnError; + var config = _kafkaOptions.AsKafkaConfig(); + _consumerClient = new Consumer(config, null, StringDeserializer); + _consumerClient.OnConsumeError += ConsumerClient_OnConsumeError; + _consumerClient.OnMessage += ConsumerClient_OnMessage; + _consumerClient.OnError += ConsumerClient_OnError; + } } - private void ConsumerClient_OnConsumeError(object sender, Message e) { var message = e.Deserialize(null, StringDeserializer);