Browse Source

upgrade Confluent.Kafka version.

master
Savorboard 7 years ago
parent
commit
050e71f52d
2 changed files with 19 additions and 13 deletions
  1. +1
    -1
      src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
  2. +18
    -12
      src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs

+ 1
- 1
src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj View File

@@ -14,7 +14,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.9.5" />
<PackageReference Include="Confluent.Kafka" Version="0.11.0" />
</ItemGroup>

<ItemGroup>


+ 18
- 12
src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs View File

@@ -28,26 +28,32 @@ namespace DotNetCore.CAP.Kafka
try
{
var config = _kafkaOptions.AsRdkafkaConfig();
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
var contentBytes = Encoding.UTF8.GetBytes(content);
using (var producer = new Producer(config))
{
producer.ProduceAsync(keyName, null, content);
producer.Flush();
}
var message = producer.ProduceAsync(keyName, null, contentBytes).Result;

_logger.LogDebug($"kafka topic message [{keyName}] has been published.");
if (!message.Error.HasError)
{
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");

return Task.FromResult(OperateResult.Success);
return Task.FromResult(OperateResult.Success);
}
else
{
return Task.FromResult(OperateResult.Failed(new OperateError
{
Code = message.Error.Code.ToString(),
Description = message.Error.Reason
}));
}
}
}
catch (Exception ex)
{
_logger.LogError($"kafka topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");

return Task.FromResult(OperateResult.Failed(ex,
new OperateError()
{
Code = ex.HResult.ToString(),
Description = ex.Message
}));
return Task.FromResult(OperateResult.Failed(ex));
}
}
}

Loading…
Cancel
Save