|
|
@@ -52,10 +52,10 @@ namespace DotNetCore.CAP.Kafka |
|
|
|
{ |
|
|
|
var consumerResult = _consumerClient.Consume(cancellationToken); |
|
|
|
|
|
|
|
if (consumerResult.IsPartitionEOF || consumerResult.Value == null) continue; |
|
|
|
if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue; |
|
|
|
|
|
|
|
var headers = new Dictionary<string, string>(consumerResult.Headers.Count); |
|
|
|
foreach (var header in consumerResult.Headers) |
|
|
|
var headers = new Dictionary<string, string>(consumerResult.Message.Headers.Count); |
|
|
|
foreach (var header in consumerResult.Message.Headers) |
|
|
|
{ |
|
|
|
var val = header.GetValueBytes(); |
|
|
|
headers.Add(header.Key, val != null ? Encoding.UTF8.GetString(val) : null); |
|
|
@@ -71,7 +71,7 @@ namespace DotNetCore.CAP.Kafka |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
var message = new TransportMessage(headers, consumerResult.Value); |
|
|
|
var message = new TransportMessage(headers, consumerResult.Message.Value); |
|
|
|
|
|
|
|
OnMessageReceived?.Invoke(consumerResult, message); |
|
|
|
} |
|
|
|