diff --git a/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs b/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs index cd97d42..a3f4c80 100644 --- a/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs +++ b/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs @@ -21,7 +21,7 @@ namespace DotNetCore.CAP.Kafka _kafkaOptions = options; } - public override async Task PublishAsync(string keyName, string content) + public override Task PublishAsync(string keyName, string content) { try { @@ -29,21 +29,21 @@ namespace DotNetCore.CAP.Kafka var contentBytes = Encoding.UTF8.GetBytes(content); using (var producer = new Producer(config)) { - var message = await producer.ProduceAsync(keyName, null, contentBytes); + var message = producer.ProduceAsync(keyName, null, contentBytes).Result; if (!message.Error.HasError) { _logger.LogDebug($"kafka topic message [{keyName}] has been published."); - return OperateResult.Success; + return Task.FromResult(OperateResult.Success); } else { - return OperateResult.Failed(new OperateError + return Task.FromResult(OperateResult.Failed(new OperateError { Code = message.Error.Code.ToString(), Description = message.Error.Reason - }); + })); } } } @@ -51,7 +51,7 @@ namespace DotNetCore.CAP.Kafka { _logger.LogError($"kafka topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}"); - return OperateResult.Failed(ex); + return Task.FromResult(OperateResult.Failed(ex)); } } }