diff --git a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj index 3c7225f..a63996c 100644 --- a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj +++ b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index 3af3d95..e7c4d6b 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -52,10 +52,10 @@ namespace DotNetCore.CAP.Kafka { var consumerResult = _consumerClient.Consume(cancellationToken); - if (consumerResult.IsPartitionEOF || consumerResult.Value == null) continue; + if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue; - var headers = new Dictionary(consumerResult.Headers.Count); - foreach (var header in consumerResult.Headers) + var headers = new Dictionary(consumerResult.Message.Headers.Count); + foreach (var header in consumerResult.Message.Headers) { var val = header.GetValueBytes(); headers.Add(header.Key, val != null ? Encoding.UTF8.GetString(val) : null); @@ -71,7 +71,7 @@ namespace DotNetCore.CAP.Kafka } } - var message = new TransportMessage(headers, consumerResult.Value); + var message = new TransportMessage(headers, consumerResult.Message.Value); OnMessageReceived?.Invoke(consumerResult, message); }