Browse Source

Fix kafka consumer client bugs

master
Savorboard 5 years ago
parent
commit
b9228c4d5b
2 changed files with 13 additions and 4 deletions
  1. +4
    -2
      src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs
  2. +9
    -2
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs

+ 4
- 2
src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs View File

@@ -34,9 +34,11 @@ namespace DotNetCore.CAP.Kafka
{
var headers = new Confluent.Kafka.Headers();

foreach (var header in message.Headers.Select(x => new Header(x.Key, Encoding.UTF8.GetBytes(x.Value))))
foreach (var header in message.Headers)
{
headers.Add(header);
headers.Add(header.Value != null
? new Header(header.Key, Encoding.UTF8.GetBytes(header.Value))
: new Header(header.Key, null));
}

var result = await producer.ProduceAsync(message.GetName(), new Message<string, byte[]>


+ 9
- 2
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs View File

@@ -55,8 +55,15 @@ namespace DotNetCore.CAP.Kafka

if (consumerResult.IsPartitionEOF || consumerResult.Value == null) continue;

var header = consumerResult.Headers.ToDictionary(x => x.Key, y => Encoding.UTF8.GetString(y.GetValueBytes()));
var message = new TransportMessage(header, consumerResult.Value);
var headers = new Dictionary<string, string>(consumerResult.Headers.Count);
foreach (var header in consumerResult.Headers)
{
var val = header.GetValueBytes();
headers.Add(header.Key, val != null ? Encoding.UTF8.GetString(val) : null);
}
headers.Add(Messages.Headers.Group, _groupId);

var message = new TransportMessage(headers, consumerResult.Value);

OnMessageReceived?.Invoke(consumerResult, message);
}


Loading…
Cancel
Save