Explorar el Código

fix dead lock.

master
Savorboard hace 7 años
padre
commit
ea64bbcbd7
Se han modificado 1 ficheros con 6 adiciones y 6 borrados
  1. +6
    -6
      src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs

+ 6
- 6
src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs Ver fichero

@@ -21,7 +21,7 @@ namespace DotNetCore.CAP.Kafka
_kafkaOptions = options;
}

public override async Task<OperateResult> PublishAsync(string keyName, string content)
public override Task<OperateResult> PublishAsync(string keyName, string content)
{
try
{
@@ -29,21 +29,21 @@ namespace DotNetCore.CAP.Kafka
var contentBytes = Encoding.UTF8.GetBytes(content);
using (var producer = new Producer(config))
{
var message = await producer.ProduceAsync(keyName, null, contentBytes);
var message = producer.ProduceAsync(keyName, null, contentBytes).Result;

if (!message.Error.HasError)
{
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");

return OperateResult.Success;
return Task.FromResult(OperateResult.Success);
}
else
{
return OperateResult.Failed(new OperateError
return Task.FromResult(OperateResult.Failed(new OperateError
{
Code = message.Error.Code.ToString(),
Description = message.Error.Reason
});
}));
}
}
}
@@ -51,7 +51,7 @@ namespace DotNetCore.CAP.Kafka
{
_logger.LogError($"kafka topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");

return OperateResult.Failed(ex);
return Task.FromResult(OperateResult.Failed(ex));
}
}
}

Cargando…
Cancelar
Guardar