瀏覽代碼

Fixed thread safety issue about KafkaOptions. #89

master
Savorboard 6 年之前
父節點
當前提交
93804119b4
共有 1 個文件被更改,包括 9 次插入7 次删除
  1. +9
    -7
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs

+ 9
- 7
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs 查看文件

@@ -66,16 +66,18 @@ namespace DotNetCore.CAP.Kafka

private void InitKafkaClient()
{
_kafkaOptions.MainConfig["group.id"] = _groupId;
lock (_kafkaOptions)
{
_kafkaOptions.MainConfig["group.id"] = _groupId;

var config = _kafkaOptions.AsKafkaConfig();
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
_consumerClient.OnConsumeError += ConsumerClient_OnConsumeError;
_consumerClient.OnMessage += ConsumerClient_OnMessage;
_consumerClient.OnError += ConsumerClient_OnError;
var config = _kafkaOptions.AsKafkaConfig();
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
_consumerClient.OnConsumeError += ConsumerClient_OnConsumeError;
_consumerClient.OnMessage += ConsumerClient_OnMessage;
_consumerClient.OnError += ConsumerClient_OnError;
}
}


private void ConsumerClient_OnConsumeError(object sender, Message e)
{
var message = e.Deserialize<Null, string>(null, StringDeserializer);


Loading…
取消
儲存