From b9228c4d5b61dec42685b4523fd6f2702fade14e Mon Sep 17 00:00:00 2001 From: Savorboard Date: Tue, 19 Nov 2019 16:01:10 +0800 Subject: [PATCH] Fix kafka consumer client bugs --- src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs | 6 ++++-- src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs | 11 +++++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs index 29151a9..fe0cfe0 100644 --- a/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs +++ b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs @@ -34,9 +34,11 @@ namespace DotNetCore.CAP.Kafka { var headers = new Confluent.Kafka.Headers(); - foreach (var header in message.Headers.Select(x => new Header(x.Key, Encoding.UTF8.GetBytes(x.Value)))) + foreach (var header in message.Headers) { - headers.Add(header); + headers.Add(header.Value != null + ? new Header(header.Key, Encoding.UTF8.GetBytes(header.Value)) + : new Header(header.Key, null)); } var result = await producer.ProduceAsync(message.GetName(), new Message diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index d9e111e..26547a0 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -55,8 +55,15 @@ namespace DotNetCore.CAP.Kafka if (consumerResult.IsPartitionEOF || consumerResult.Value == null) continue; - var header = consumerResult.Headers.ToDictionary(x => x.Key, y => Encoding.UTF8.GetString(y.GetValueBytes())); - var message = new TransportMessage(header, consumerResult.Value); + var headers = new Dictionary(consumerResult.Headers.Count); + foreach (var header in consumerResult.Headers) + { + var val = header.GetValueBytes(); + headers.Add(header.Key, val != null ? Encoding.UTF8.GetString(val) : null); + } + headers.Add(Messages.Headers.Group, _groupId); + + var message = new TransportMessage(headers, consumerResult.Value); OnMessageReceived?.Invoke(consumerResult, message); }