diff --git a/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs b/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs index 3e95be2..f3e4e5d 100644 --- a/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs +++ b/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs @@ -2,10 +2,8 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using System.Diagnostics; using System.Text; using System.Threading.Tasks; -using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Processor.States; using Microsoft.Extensions.Logging; @@ -16,7 +14,6 @@ namespace DotNetCore.CAP.Kafka { private readonly IConnectionPool _connectionPool; private readonly ILogger _logger; - private readonly string _serversAddress; public KafkaPublishMessageSender( CapOptions options, IStateChanger stateChanger, IStorageConnection connection, @@ -25,21 +22,15 @@ namespace DotNetCore.CAP.Kafka { _logger = logger; _connectionPool = connectionPool; - _serversAddress = _connectionPool.ServersAddress; + ServersAddress = _connectionPool.ServersAddress; } public override async Task PublishAsync(string keyName, string content) { - var startTime = DateTimeOffset.UtcNow; - var stopwatch = Stopwatch.StartNew(); - Guid operationId = Guid.Empty; - var producer = _connectionPool.Rent(); try { - operationId = s_diagnosticListener.WritePublishBefore(keyName, content, _serversAddress); - var contentBytes = Encoding.UTF8.GetBytes(content); var message = await producer.ProduceAsync(keyName, null, contentBytes); @@ -52,16 +43,12 @@ namespace DotNetCore.CAP.Kafka }); } - s_diagnosticListener.WritePublishAfter(operationId, message.Topic, content, _serversAddress, startTime, stopwatch.Elapsed); - _logger.LogDebug($"kafka topic message [{keyName}] has been published."); return OperateResult.Success; } catch (Exception ex) { - s_diagnosticListener.WritePublishError(operationId, keyName, content, _serversAddress, ex, startTime, stopwatch.Elapsed); - var wapperEx = new PublisherSentFailedException(ex.Message, ex); return OperateResult.Failed(wapperEx); @@ -74,7 +61,6 @@ namespace DotNetCore.CAP.Kafka producer.Dispose(); } } - } - + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index ccce448..f279afb 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -29,6 +29,8 @@ namespace DotNetCore.CAP.Kafka public event EventHandler OnLog; + public string ServersAddress => _kafkaOptions.Servers; + public void Subscribe(IEnumerable topics) { if (topics == null) diff --git a/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs b/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs index 2cb15c2..3dacba3 100644 --- a/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs +++ b/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs @@ -2,10 +2,8 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using System.Diagnostics; using System.Text; using System.Threading.Tasks; -using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Processor.States; using Microsoft.Extensions.Logging; @@ -18,7 +16,6 @@ namespace DotNetCore.CAP.RabbitMQ private readonly IConnectionChannelPool _connectionChannelPool; private readonly ILogger _logger; private readonly string _exchange; - private readonly string _hostAddress; public RabbitMQPublishMessageSender(ILogger logger, CapOptions options, IStorageConnection connection, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger) @@ -27,33 +24,24 @@ namespace DotNetCore.CAP.RabbitMQ _logger = logger; _connectionChannelPool = connectionChannelPool; _exchange = _connectionChannelPool.Exchange; - _hostAddress = _connectionChannelPool.HostAddress; + ServersAddress = _connectionChannelPool.HostAddress; } public override Task PublishAsync(string keyName, string content) { - var startTime = DateTimeOffset.UtcNow; - var stopwatch = Stopwatch.StartNew(); - Guid operationId = Guid.Empty; - var channel = _connectionChannelPool.Rent(); try { - operationId = s_diagnosticListener.WritePublishBefore(keyName, content, _hostAddress); - var body = Encoding.UTF8.GetBytes(content); channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true); channel.BasicPublish(_exchange, keyName, null, body); - s_diagnosticListener.WritePublishAfter(operationId, keyName, content, _hostAddress, startTime, stopwatch.Elapsed); _logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published."); return Task.FromResult(OperateResult.Success); } catch (Exception ex) { - s_diagnosticListener.WritePublishError(operationId, keyName, content, _hostAddress, ex, startTime, stopwatch.Elapsed); - var wapperEx = new PublisherSentFailedException(ex.Message, ex); var errors = new OperateError { diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index 402d0fa..a34054f 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -37,6 +37,8 @@ namespace DotNetCore.CAP.RabbitMQ public event EventHandler OnLog; + public string ServersAddress => _rabbitMQOptions.HostName; + public void Subscribe(IEnumerable topics) { if (topics == null) diff --git a/src/DotNetCore.CAP/Diagnostics/DiagnosticListenerExtensions.cs b/src/DotNetCore.CAP/Diagnostics/DiagnosticListenerExtensions.cs index 154309f..e833a8e 100644 --- a/src/DotNetCore.CAP/Diagnostics/DiagnosticListenerExtensions.cs +++ b/src/DotNetCore.CAP/Diagnostics/DiagnosticListenerExtensions.cs @@ -174,7 +174,7 @@ namespace DotNetCore.CAP.Diagnostics var subscribeGroup = context.ConsumerDescriptor.Attribute.Group; var parameterValues = context.DeliverMessage.Content; - @this.Write(CapBeforePublish, new SubscriberInvokeEventData(operationId, operation, methodName, + @this.Write(CapBeforeSubscriberInvoke, new SubscriberInvokeEventData(operationId, operation, methodName, subscribeName, subscribeGroup, parameterValues, DateTimeOffset.UtcNow)); @@ -198,7 +198,7 @@ namespace DotNetCore.CAP.Diagnostics var subscribeGroup = context.ConsumerDescriptor.Attribute.Group; var parameterValues = context.DeliverMessage.Content; - @this.Write(CapBeforePublish, new SubscriberInvokeEndEventData(operationId, operation, methodName, + @this.Write(CapAfterSubscriberInvoke, new SubscriberInvokeEndEventData(operationId, operation, methodName, subscribeName, subscribeGroup, parameterValues, startTime, duration)); } @@ -219,7 +219,7 @@ namespace DotNetCore.CAP.Diagnostics var subscribeGroup = context.ConsumerDescriptor.Attribute.Group; var parameterValues = context.DeliverMessage.Content; - @this.Write(CapBeforePublish, new SubscriberInvokeErrorEventData(operationId, operation, methodName, + @this.Write(CapErrorSubscriberInvoke, new SubscriberInvokeErrorEventData(operationId, operation, methodName, subscribeName, subscribeGroup, parameterValues, ex, startTime, duration)); } diff --git a/src/DotNetCore.CAP/IConsumerClient.cs b/src/DotNetCore.CAP/IConsumerClient.cs index 8b46e71..150bc4c 100644 --- a/src/DotNetCore.CAP/IConsumerClient.cs +++ b/src/DotNetCore.CAP/IConsumerClient.cs @@ -13,6 +13,8 @@ namespace DotNetCore.CAP /// public interface IConsumerClient : IDisposable { + string ServersAddress { get; } + /// /// Subscribe to a set of topics to the message queue /// diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 43e34df..8d48a37 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -24,6 +24,7 @@ namespace DotNetCore.CAP private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); private readonly MethodMatcherCache _selector; + private string _serverAddress; private Task _compositeTask; private bool _disposed; @@ -56,6 +57,8 @@ namespace DotNetCore.CAP { using (var client = _consumerClientFactory.Create(matchGroup.Key)) { + _serverAddress = client.ServersAddress; + RegisterMessageProcessor(client); client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name)); @@ -102,30 +105,24 @@ namespace DotNetCore.CAP { var startTime = DateTimeOffset.UtcNow; var stopwatch = Stopwatch.StartNew(); - var operationId = Guid.Empty; + + var tracingResult = TracingBefore(messageContext.Name, messageContext.Content); + var operationId = tracingResult.Item1; + var messageBody = tracingResult.Item2; var receivedMessage = new CapReceivedMessage(messageContext) { - StatusName = StatusName.Scheduled + StatusName = StatusName.Scheduled, + Content = messageBody }; try { - operationId = s_diagnosticListener.WriteReceiveMessageStoreBefore( - messageContext.Name, - messageContext.Content, - messageContext.Group); - StoreMessage(receivedMessage); client.Commit(); - s_diagnosticListener.WriteReceiveMessageStoreAfter( - operationId, - messageContext.Name, - messageContext.Content, - messageContext.Group, - startTime, + TracingAfter(operationId, receivedMessage.Name, receivedMessage.Content, startTime, stopwatch.Elapsed); _dispatcher.EnqueueToExecute(receivedMessage); @@ -136,12 +133,7 @@ namespace DotNetCore.CAP client.Reject(); - s_diagnosticListener.WriteReceiveMessageStoreError(operationId, - messageContext.Name, - messageContext.Content, - messageContext.Group, - e, - startTime, + TracingError(operationId, receivedMessage.Name, receivedMessage.Content, e, startTime, stopwatch.Elapsed); } }; @@ -183,5 +175,50 @@ namespace DotNetCore.CAP receivedMessage.Id = id; } + + private (Guid, string) TracingBefore(string topic, string values) + { + Guid operationId = Guid.NewGuid(); + + var eventData = new BrokerConsumeEventData( + operationId, "", + _serverAddress, + topic, + values, + DateTimeOffset.UtcNow); + + s_diagnosticListener.WriteConsumeBefore(eventData); + + return (operationId, eventData.BrokerTopicBody); + } + + private void TracingAfter(Guid operationId, string topic, string values, DateTimeOffset startTime, TimeSpan du) + { + var eventData = new BrokerConsumeEndEventData( + operationId, + "", + _serverAddress, + topic, + values, + startTime, + du); + + s_diagnosticListener.WriteConsumeAfter(eventData); + } + + private void TracingError(Guid operationId, string topic, string values, Exception ex, DateTimeOffset startTime, TimeSpan du) + { + var eventData = new BrokerConsumeErrorEventData( + operationId, + "", + _serverAddress, + topic, + values, + ex, + startTime, + du); + + s_diagnosticListener.WriteConsumeError(eventData); + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs index 5b6588d..eb37cfc 100644 --- a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs +++ b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs @@ -21,6 +21,8 @@ namespace DotNetCore.CAP private readonly CapOptions _options; private readonly IStateChanger _stateChanger; + protected string ServersAddress { get; set; } + // diagnostics listener // ReSharper disable once InconsistentNaming protected static readonly DiagnosticListener s_diagnosticListener = @@ -42,25 +44,35 @@ namespace DotNetCore.CAP public async Task SendAsync(CapPublishedMessage message) { - var sp = Stopwatch.StartNew(); + var startTime = DateTimeOffset.UtcNow; + var stopwatch = Stopwatch.StartNew(); + + var tracingResult = TracingBefore(message.Name, message.Content); + var operationId = tracingResult.Item1; - var result = await PublishAsync(message.Name, message.Content); + var sendValues = tracingResult.Item2 != null + ? Helper.AddTracingHeaderProperty(message.Content, tracingResult.Item2) + : message.Content; - sp.Stop(); + var result = await PublishAsync(message.Name, sendValues); + stopwatch.Stop(); if (result.Succeeded) { await SetSuccessfulState(message); - _logger.MessageHasBeenSent(sp.Elapsed.TotalSeconds); + TracingAfter(operationId, message.Name, sendValues, startTime, stopwatch.Elapsed); return OperateResult.Success; } else { + TracingError(operationId, message.Name, sendValues, result.Exception, startTime, stopwatch.Elapsed); + _logger.MessagePublishException(message.Id, result.Exception); await SetFailedState(message, result.Exception, out bool stillRetry); + if (stillRetry) { _logger.SenderRetrying(3); @@ -122,5 +134,51 @@ namespace DotNetCore.CAP { message.Content = Helper.AddExceptionProperty(message.Content, exception); } + + private (Guid, TracingHeaders) TracingBefore(string topic, string values) + { + Guid operationId = Guid.NewGuid(); + + var eventData = new BrokerPublishEventData( + operationId, "", + ServersAddress, topic, + values, + DateTimeOffset.UtcNow); + + s_diagnosticListener.WritePublishBefore(eventData); + + return (operationId, eventData.Headers); + } + + private void TracingAfter(Guid operationId, string topic, string values, DateTimeOffset startTime, TimeSpan du) + { + var eventData = new BrokerPublishEndEventData( + operationId, + "", + ServersAddress, + topic, + values, + startTime, + du); + + s_diagnosticListener.WritePublishAfter(eventData); + + _logger.MessageHasBeenSent(du.TotalSeconds); + } + + private void TracingError(Guid operationId, string topic, string values, Exception ex, DateTimeOffset startTime, TimeSpan du) + { + var eventData = new BrokerPublishErrorEventData( + operationId, + "", + ServersAddress, + topic, + values, + ex, + startTime, + du); + + s_diagnosticListener.WritePublishError(eventData); + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs index 0629852..92f988e 100644 --- a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs +++ b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs @@ -153,11 +153,11 @@ namespace DotNetCore.CAP try { - operationId = s_diagnosticListener.WriteConsumerInvokeBefore(consumerContext); + operationId = s_diagnosticListener.WriteSubscriberInvokeBefore(consumerContext); var ret = await Invoker.InvokeAsync(consumerContext); - s_diagnosticListener.WriteConsumerInvokeAfter(operationId, consumerContext, startTime, stopwatch.Elapsed); + s_diagnosticListener.WriteSubscriberInvokeAfter(operationId, consumerContext, startTime, stopwatch.Elapsed); if (!string.IsNullOrEmpty(ret.CallbackName)) { @@ -166,7 +166,7 @@ namespace DotNetCore.CAP } catch (Exception ex) { - s_diagnosticListener.WriteConsumerInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed); + s_diagnosticListener.WriteSubscriberInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed); throw new SubscriberExecutionFailedException(ex.Message, ex); }