Savorboard пре 6 година
родитељ
комит
8689acd9ae
1 измењених фајлова са 18 додато и 6 уклоњено
  1. +18
    -6
      src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs

src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs → src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs Прегледај датотеку

@@ -2,8 +2,10 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
@@ -15,33 +17,43 @@ namespace DotNetCore.CAP.RabbitMQ
{
private readonly IConnectionChannelPool _connectionChannelPool;
private readonly ILogger _logger;
private readonly RabbitMQOptions _rabbitMQOptions;
private readonly string _exchange;
private readonly string _hostAddress;

public RabbitMQPublishMessageSender(ILogger<RabbitMQPublishMessageSender> logger, CapOptions options, RabbitMQOptions rabbitMQOptions,
public RabbitMQPublishMessageSender(ILogger<RabbitMQPublishMessageSender> logger, CapOptions options,
IStorageConnection connection, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
_connectionChannelPool = connectionChannelPool;
_rabbitMQOptions = rabbitMQOptions;
_exchange = _connectionChannelPool.Exchange;
_hostAddress = _connectionChannelPool.HostAddress;
}

public override Task<OperateResult> PublishAsync(string keyName, string content)
{
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
Guid operationId = Guid.Empty;

var channel = _connectionChannelPool.Rent();
try
{
var body = Encoding.UTF8.GetBytes(content);
operationId = s_diagnosticListener.WritePublishBefore(keyName, content, _hostAddress);

channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_rabbitMQOptions.TopicExchangeName, keyName, null, body);
var body = Encoding.UTF8.GetBytes(content);
channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_exchange, keyName, null, body);

s_diagnosticListener.WritePublishAfter(operationId, keyName, content, _hostAddress, startTime, stopwatch.Elapsed);
_logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published.");

return Task.FromResult(OperateResult.Success);
}
catch (Exception ex)
{
s_diagnosticListener.WritePublishError(operationId, keyName, content, _hostAddress, ex, startTime, stopwatch.Elapsed);

var wapperEx = new PublisherSentFailedException(ex.Message, ex);
var errors = new OperateError
{

Loading…
Откажи
Сачувај