Browse Source

rename file name

master
Savorboard 6 years ago
parent
commit
19edfbd219
1 changed files with 18 additions and 6 deletions
  1. +18
    -6
      src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs

src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs → src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs View File

@@ -2,8 +2,10 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.


using System; using System;
using System.Diagnostics;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States; using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@@ -15,33 +17,43 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
private readonly IConnectionChannelPool _connectionChannelPool; private readonly IConnectionChannelPool _connectionChannelPool;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly RabbitMQOptions _rabbitMQOptions;
private readonly string _exchange;
private readonly string _hostAddress;


public RabbitMQPublishMessageSender(ILogger<RabbitMQPublishMessageSender> logger, CapOptions options, RabbitMQOptions rabbitMQOptions,
public RabbitMQPublishMessageSender(ILogger<RabbitMQPublishMessageSender> logger, CapOptions options,
IStorageConnection connection, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger) IStorageConnection connection, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger)
: base(logger, options, connection, stateChanger) : base(logger, options, connection, stateChanger)
{ {
_logger = logger; _logger = logger;
_connectionChannelPool = connectionChannelPool; _connectionChannelPool = connectionChannelPool;
_rabbitMQOptions = rabbitMQOptions;
_exchange = _connectionChannelPool.Exchange;
_hostAddress = _connectionChannelPool.HostAddress;
} }


public override Task<OperateResult> PublishAsync(string keyName, string content) public override Task<OperateResult> PublishAsync(string keyName, string content)
{ {
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
Guid operationId = Guid.Empty;

var channel = _connectionChannelPool.Rent(); var channel = _connectionChannelPool.Rent();
try try
{ {
var body = Encoding.UTF8.GetBytes(content);
operationId = s_diagnosticListener.WritePublishBefore(keyName, content, _hostAddress);


channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_rabbitMQOptions.TopicExchangeName, keyName, null, body);
var body = Encoding.UTF8.GetBytes(content);
channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_exchange, keyName, null, body);


s_diagnosticListener.WritePublishAfter(operationId, keyName, content, _hostAddress, startTime, stopwatch.Elapsed);
_logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published."); _logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published.");


return Task.FromResult(OperateResult.Success); return Task.FromResult(OperateResult.Success);
} }
catch (Exception ex) catch (Exception ex)
{ {
s_diagnosticListener.WritePublishError(operationId, keyName, content, _hostAddress, ex, startTime, stopwatch.Elapsed);

var wapperEx = new PublisherSentFailedException(ex.Message, ex); var wapperEx = new PublisherSentFailedException(ex.Message, ex);
var errors = new OperateError var errors = new OperateError
{ {

Loading…
Cancel
Save