diff --git a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj index 81686f3..0a8fe64 100644 --- a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj +++ b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj @@ -14,7 +14,7 @@ - + diff --git a/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs b/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs index 8252629..df49e8a 100644 --- a/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs +++ b/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs @@ -28,26 +28,32 @@ namespace DotNetCore.CAP.Kafka try { var config = _kafkaOptions.AsRdkafkaConfig(); - using (var producer = new Producer(config, null, new StringSerializer(Encoding.UTF8))) + var contentBytes = Encoding.UTF8.GetBytes(content); + using (var producer = new Producer(config)) { - producer.ProduceAsync(keyName, null, content); - producer.Flush(); - } + var message = producer.ProduceAsync(keyName, null, contentBytes).Result; - _logger.LogDebug($"kafka topic message [{keyName}] has been published."); + if (!message.Error.HasError) + { + _logger.LogDebug($"kafka topic message [{keyName}] has been published."); - return Task.FromResult(OperateResult.Success); + return Task.FromResult(OperateResult.Success); + } + else + { + return Task.FromResult(OperateResult.Failed(new OperateError + { + Code = message.Error.Code.ToString(), + Description = message.Error.Reason + })); + } + } } catch (Exception ex) { _logger.LogError($"kafka topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}"); - return Task.FromResult(OperateResult.Failed(ex, - new OperateError() - { - Code = ex.HResult.ToString(), - Description = ex.Message - })); + return Task.FromResult(OperateResult.Failed(ex)); } } }