diff --git a/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs b/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs similarity index 66% rename from src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs rename to src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs index 9afedc7..2cb15c2 100644 --- a/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs +++ b/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs @@ -2,8 +2,10 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Diagnostics; using System.Text; using System.Threading.Tasks; +using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Processor.States; using Microsoft.Extensions.Logging; @@ -15,33 +17,43 @@ namespace DotNetCore.CAP.RabbitMQ { private readonly IConnectionChannelPool _connectionChannelPool; private readonly ILogger _logger; - private readonly RabbitMQOptions _rabbitMQOptions; + private readonly string _exchange; + private readonly string _hostAddress; - public RabbitMQPublishMessageSender(ILogger logger, CapOptions options, RabbitMQOptions rabbitMQOptions, + public RabbitMQPublishMessageSender(ILogger logger, CapOptions options, IStorageConnection connection, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger) : base(logger, options, connection, stateChanger) { _logger = logger; _connectionChannelPool = connectionChannelPool; - _rabbitMQOptions = rabbitMQOptions; + _exchange = _connectionChannelPool.Exchange; + _hostAddress = _connectionChannelPool.HostAddress; } public override Task PublishAsync(string keyName, string content) { + var startTime = DateTimeOffset.UtcNow; + var stopwatch = Stopwatch.StartNew(); + Guid operationId = Guid.Empty; + var channel = _connectionChannelPool.Rent(); try { - var body = Encoding.UTF8.GetBytes(content); + operationId = s_diagnosticListener.WritePublishBefore(keyName, content, _hostAddress); - channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType, true); - channel.BasicPublish(_rabbitMQOptions.TopicExchangeName, keyName, null, body); + var body = Encoding.UTF8.GetBytes(content); + channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true); + channel.BasicPublish(_exchange, keyName, null, body); + s_diagnosticListener.WritePublishAfter(operationId, keyName, content, _hostAddress, startTime, stopwatch.Elapsed); _logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published."); return Task.FromResult(OperateResult.Success); } catch (Exception ex) { + s_diagnosticListener.WritePublishError(operationId, keyName, content, _hostAddress, ex, startTime, stopwatch.Elapsed); + var wapperEx = new PublisherSentFailedException(ex.Message, ex); var errors = new OperateError {