From 050e71f52dd2908b96fd89d340fd3131dd14a40c Mon Sep 17 00:00:00 2001 From: Savorboard Date: Tue, 25 Jul 2017 00:38:05 +0800 Subject: [PATCH] upgrade Confluent.Kafka version. --- .../DotNetCore.CAP.Kafka.csproj | 2 +- .../PublishQueueExecutor.cs | 30 +++++++++++-------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj index 81686f3..0a8fe64 100644 --- a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj +++ b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj @@ -14,7 +14,7 @@ - + diff --git a/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs b/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs index 8252629..df49e8a 100644 --- a/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs +++ b/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs @@ -28,26 +28,32 @@ namespace DotNetCore.CAP.Kafka try { var config = _kafkaOptions.AsRdkafkaConfig(); - using (var producer = new Producer(config, null, new StringSerializer(Encoding.UTF8))) + var contentBytes = Encoding.UTF8.GetBytes(content); + using (var producer = new Producer(config)) { - producer.ProduceAsync(keyName, null, content); - producer.Flush(); - } + var message = producer.ProduceAsync(keyName, null, contentBytes).Result; - _logger.LogDebug($"kafka topic message [{keyName}] has been published."); + if (!message.Error.HasError) + { + _logger.LogDebug($"kafka topic message [{keyName}] has been published."); - return Task.FromResult(OperateResult.Success); + return Task.FromResult(OperateResult.Success); + } + else + { + return Task.FromResult(OperateResult.Failed(new OperateError + { + Code = message.Error.Code.ToString(), + Description = message.Error.Reason + })); + } + } } catch (Exception ex) { _logger.LogError($"kafka topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}"); - return Task.FromResult(OperateResult.Failed(ex, - new OperateError() - { - Code = ex.HResult.ToString(), - Description = ex.Message - })); + return Task.FromResult(OperateResult.Failed(ex)); } } }