diff --git a/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs index 29151a9..fe0cfe0 100644 --- a/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs +++ b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs @@ -34,9 +34,11 @@ namespace DotNetCore.CAP.Kafka { var headers = new Confluent.Kafka.Headers(); - foreach (var header in message.Headers.Select(x => new Header(x.Key, Encoding.UTF8.GetBytes(x.Value)))) + foreach (var header in message.Headers) { - headers.Add(header); + headers.Add(header.Value != null + ? new Header(header.Key, Encoding.UTF8.GetBytes(header.Value)) + : new Header(header.Key, null)); } var result = await producer.ProduceAsync(message.GetName(), new Message diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index d9e111e..26547a0 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -55,8 +55,15 @@ namespace DotNetCore.CAP.Kafka if (consumerResult.IsPartitionEOF || consumerResult.Value == null) continue; - var header = consumerResult.Headers.ToDictionary(x => x.Key, y => Encoding.UTF8.GetString(y.GetValueBytes())); - var message = new TransportMessage(header, consumerResult.Value); + var headers = new Dictionary(consumerResult.Headers.Count); + foreach (var header in consumerResult.Headers) + { + var val = header.GetValueBytes(); + headers.Add(header.Key, val != null ? Encoding.UTF8.GetString(val) : null); + } + headers.Add(Messages.Headers.Group, _groupId); + + var message = new TransportMessage(headers, consumerResult.Value); OnMessageReceived?.Invoke(consumerResult, message); }