diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs index b7231dc..e3167d8 100644 --- a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs +++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs @@ -3,8 +3,10 @@ using System; using System.Data; +using System.Diagnostics; using System.Threading.Tasks; using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Internal; using DotNetCore.CAP.Models; using Microsoft.Extensions.Logging; @@ -15,6 +17,11 @@ namespace DotNetCore.CAP.Abstractions private readonly IDispatcher _dispatcher; private readonly ILogger _logger; + // diagnostics listener + // ReSharper disable once InconsistentNaming + private static readonly DiagnosticListener s_diagnosticListener = + new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); + protected CapPublisherBase(ILogger logger, IDispatcher dispatcher) { _logger = logger; @@ -75,13 +82,13 @@ namespace DotNetCore.CAP.Abstractions protected virtual string Serialize(T obj, string callbackName = null) { - var packer = (IMessagePacker) ServiceProvider.GetService(typeof(IMessagePacker)); + var packer = (IMessagePacker)ServiceProvider.GetService(typeof(IMessagePacker)); string content; if (obj != null) { if (Helper.IsComplexType(obj.GetType())) { - var serializer = (IContentSerializer) ServiceProvider.GetService(typeof(IContentSerializer)); + var serializer = (IContentSerializer)ServiceProvider.GetService(typeof(IContentSerializer)); content = serializer.Serialize(obj); } else @@ -179,16 +186,20 @@ namespace DotNetCore.CAP.Abstractions private void PublishWithTrans(string name, T contentObj, string callbackName = null) { - try + Guid operationId = default(Guid); + + var content = Serialize(contentObj, callbackName); + + var message = new CapPublishedMessage { - var content = Serialize(contentObj, callbackName); + Name = name, + Content = content, + StatusName = StatusName.Scheduled + }; - var message = new CapPublishedMessage - { - Name = name, - Content = content, - StatusName = StatusName.Scheduled - }; + try + { + operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); var id = Execute(DbConnection, DbTransaction, message); @@ -197,15 +208,15 @@ namespace DotNetCore.CAP.Abstractions if (id > 0) { _logger.LogInformation($"message [{message}] has been persisted in the database."); - + s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); message.Id = id; - Enqueue(message); } } catch (Exception e) { _logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e); + s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); Console.WriteLine(e); throw; } diff --git a/src/DotNetCore.CAP/DotNetCore.CAP.csproj b/src/DotNetCore.CAP/DotNetCore.CAP.csproj index 8bf4189..464a2a5 100644 --- a/src/DotNetCore.CAP/DotNetCore.CAP.csproj +++ b/src/DotNetCore.CAP/DotNetCore.CAP.csproj @@ -55,6 +55,7 @@ + diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 86baecc..b9bbd2d 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -25,6 +26,11 @@ namespace DotNetCore.CAP private Task _compositeTask; private bool _disposed; + // diagnostics listener + // ReSharper disable once InconsistentNaming + private static readonly DiagnosticListener s_diagnosticListener = + new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); + public ConsumerHandler(IConsumerClientFactory consumerClientFactory, IDispatcher dispatcher, IStorageConnection connection, @@ -91,21 +97,34 @@ namespace DotNetCore.CAP private void RegisterMessageProcessor(IConsumerClient client) { - client.OnMessageReceived += (sender, message) => + client.OnMessageReceived += (sender, messageContext) => { + Guid operationId = default(Guid); + + var receivedMessage = new CapReceivedMessage(messageContext) + { + StatusName = StatusName.Scheduled + }; + try { - var storedMessage = StoreMessage(message); + operationId = s_diagnosticListener.WriteReceiveMessageStoreBefore(receivedMessage); + + StoreMessage(receivedMessage); client.Commit(); - _dispatcher.EnqueueToExecute(storedMessage); + s_diagnosticListener.WriteReceiveMessageStoreAfter(operationId, receivedMessage); + + _dispatcher.EnqueueToExecute(receivedMessage); } catch (Exception e) { - _logger.LogError(e, "An exception occurred when storage received message. Message:'{0}'.", - message); + _logger.LogError(e, "An exception occurred when storage received message. Message:'{0}'.", messageContext); + client.Reject(); + + s_diagnosticListener.WriteReceiveMessageStoreError(operationId, receivedMessage, e); } }; @@ -139,15 +158,12 @@ namespace DotNetCore.CAP } } - private CapReceivedMessage StoreMessage(MessageContext messageContext) + private void StoreMessage(CapReceivedMessage receivedMessage) { - var receivedMessage = new CapReceivedMessage(messageContext) - { - StatusName = StatusName.Scheduled - }; - var id = _connection.StoreReceivedMessageAsync(receivedMessage).GetAwaiter().GetResult(); + var id = _connection.StoreReceivedMessageAsync(receivedMessage) + .GetAwaiter().GetResult(); + receivedMessage.Id = id; - return receivedMessage; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs index d57fc8c..31c1f2a 100644 --- a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs +++ b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs @@ -20,6 +20,11 @@ namespace DotNetCore.CAP private readonly CapOptions _options; private readonly IStateChanger _stateChanger; + // diagnostics listener + // ReSharper disable once InconsistentNaming + private static readonly DiagnosticListener s_diagnosticListener = + new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); + protected BasePublishMessageSender( ILogger logger, CapOptions options, @@ -37,6 +42,7 @@ namespace DotNetCore.CAP public async Task SendAsync(CapPublishedMessage message) { var sp = Stopwatch.StartNew(); + var operationId = s_diagnosticListener.WritePublishBefore(message); var result = await PublishAsync(message.Name, message.Content); @@ -45,22 +51,26 @@ namespace DotNetCore.CAP if (result.Succeeded) { await SetSuccessfulState(message); + + s_diagnosticListener.WritePublishAfter(operationId, message); _logger.MessageHasBeenSent(sp.Elapsed.TotalSeconds); return OperateResult.Success; } - - _logger.MessagePublishException(message.Id, result.Exception); - - await SetFailedState(message, result.Exception, out bool stillRetry); - - if (stillRetry) + else { - _logger.SenderRetrying(3); + s_diagnosticListener.WritePublishError(operationId, message, result.Exception); + _logger.MessagePublishException(message.Id, result.Exception); - await SendAsync(message); + await SetFailedState(message, result.Exception, out bool stillRetry); + if (stillRetry) + { + _logger.SenderRetrying(3); + + await SendAsync(message); + } + return OperateResult.Failed(result.Exception); } - return OperateResult.Failed(result.Exception); } private static bool UpdateMessageForRetryAsync(CapPublishedMessage message) diff --git a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs index 0ea2b13..9133863 100644 --- a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs +++ b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs @@ -18,10 +18,14 @@ namespace DotNetCore.CAP private readonly ICallbackMessageSender _callbackMessageSender; private readonly IStorageConnection _connection; private readonly ILogger _logger; + private readonly IStateChanger _stateChanger; private readonly CapOptions _options; - private readonly MethodMatcherCache _selector; - private readonly IStateChanger _stateChanger; + + // diagnostics listener + // ReSharper disable once InconsistentNaming + private static readonly DiagnosticListener s_diagnosticListener = + new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); public DefaultSubscriberExecutor( ILogger logger, @@ -139,12 +143,18 @@ namespace DotNetCore.CAP var error = $"message can not be found subscriber, Message:{receivedMessage},\r\n see: https://github.com/dotnetcore/CAP/issues/63"; throw new SubscriberNotFoundException(error); } + + Guid operationId = default(Guid); + var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext()); + try { - var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext()); + operationId = s_diagnosticListener.WriteConsumerInvokeBefore(consumerContext); var ret = await Invoker.InvokeAsync(consumerContext); + s_diagnosticListener.WriteConsumerInvokeAfter(operationId,consumerContext); + if (!string.IsNullOrEmpty(ret.CallbackName)) { await _callbackMessageSender.SendAsync(ret.MessageId, ret.CallbackName, ret.Result); @@ -152,6 +162,8 @@ namespace DotNetCore.CAP } catch (Exception ex) { + s_diagnosticListener.WriteConsumerInvokeError(operationId, consumerContext, ex); + throw new SubscriberExecutionFailedException(ex.Message, ex); } } diff --git a/src/DotNetCore.CAP/Internal/CapDiagnosticListenerExtensions.cs b/src/DotNetCore.CAP/Internal/CapDiagnosticListenerExtensions.cs new file mode 100644 index 0000000..3f5999e --- /dev/null +++ b/src/DotNetCore.CAP/Internal/CapDiagnosticListenerExtensions.cs @@ -0,0 +1,250 @@ +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.Internal +{ + /// + /// Extension methods on the DiagnosticListener class to log CAP data + /// + internal static class CapDiagnosticListenerExtensions + { + public const string DiagnosticListenerName = "CapDiagnosticListener"; + + private const string CapPrefix = "DotNetCore.CAP."; + + public const string CapBeforePublishMessageStore = CapPrefix + nameof(WritePublishMessageStoreBefore); + public const string CapAfterPublishMessageStore = CapPrefix + nameof(WritePublishMessageStoreAfter); + public const string CapErrorPublishMessageStore = CapPrefix + nameof(WritePublishMessageStoreError); + + public const string CapBeforePublish = CapPrefix + nameof(WritePublishBefore); + public const string CapAfterPublish = CapPrefix + nameof(WritePublishAfter); + public const string CapErrorPublish = CapPrefix + nameof(WritePublishError); + + public const string CapBeforeReceiveMessageStore = CapPrefix + nameof(WriteReceiveMessageStoreBefore); + public const string CapAfterReceiveMessageStore = CapPrefix + nameof(WriteReceiveMessageStoreAfter); + public const string CapErrorReceiveMessageStore = CapPrefix + nameof(WriteReceiveMessageStoreError); + + public const string CapBeforeConsumerInvoke = CapPrefix + nameof(WriteConsumerInvokeBefore); + public const string CapAfterConsumerInvoke = CapPrefix + nameof(WriteConsumerInvokeAfter); + public const string CapErrorConsumerInvoke = CapPrefix + nameof(WriteConsumerInvokeError); + + public static Guid WritePublishMessageStoreBefore(this DiagnosticListener @this, CapPublishedMessage message, [CallerMemberName] string operation = "") + { + if (@this.IsEnabled(CapBeforePublishMessageStore)) + { + Guid operationId = Guid.NewGuid(); + + @this.Write(CapBeforePublishMessageStore, new + { + OperationId = operationId, + Operation = operation, + MessageName = message.Name, + MessageContent = message.Content + }); + + return operationId; + } + return Guid.Empty; + } + + public static void WritePublishMessageStoreAfter(this DiagnosticListener @this, Guid operationId, CapPublishedMessage message, [CallerMemberName] string operation = "") + { + if (@this.IsEnabled(CapAfterPublishMessageStore)) + { + @this.Write(CapAfterPublishMessageStore, new + { + OperationId = operationId, + Operation = operation, + MessageId = message.Id, + MessageName = message.Name, + MessageContent = message.Content, + Timestamp = Stopwatch.GetTimestamp() + }); + } + } + + public static void WritePublishMessageStoreError(this DiagnosticListener @this, Guid operationId, + CapPublishedMessage message, Exception ex, [CallerMemberName] string operation = "") + { + if (@this.IsEnabled(CapErrorPublishMessageStore)) + { + @this.Write(CapErrorPublishMessageStore, new + { + OperationId = operationId, + Operation = operation, + MessageName = message.Name, + MessageContent = message.Content, + Exception = ex, + Timestamp = Stopwatch.GetTimestamp() + }); + } + } + + public static Guid WritePublishBefore(this DiagnosticListener @this, CapPublishedMessage message, [CallerMemberName] string operation = "") + { + if (@this.IsEnabled(CapBeforePublish)) + { + Guid operationId = Guid.NewGuid(); + + @this.Write(CapBeforePublish, new + { + OperationId = operationId, + Operation = operation, + MessageId = message.Id, + MessageName = message.Name, + MessageContent = message.Content + }); + + return operationId; + } + return Guid.Empty; + } + + public static void WritePublishAfter(this DiagnosticListener @this, Guid operationId, CapPublishedMessage message, [CallerMemberName] string operation = "") + { + if (@this.IsEnabled(CapAfterPublish)) + { + @this.Write(CapAfterPublish, new + { + OperationId = operationId, + Operation = operation, + MessageId = message.Id, + MessageName = message.Name, + MessageContent = message.Content, + Timestamp = Stopwatch.GetTimestamp() + }); + } + } + + public static void WritePublishError(this DiagnosticListener @this, Guid operationId, + CapPublishedMessage message, Exception ex, [CallerMemberName] string operation = "") + { + if (@this.IsEnabled(CapErrorPublish)) + { + @this.Write(CapErrorPublish, new + { + OperationId = operationId, + Operation = operation, + MessageId = message.Id, + MessageName = message.Name, + MessageContent = message.Content, + Exception = ex, + Timestamp = Stopwatch.GetTimestamp() + }); + } + } + + public static Guid WriteReceiveMessageStoreBefore(this DiagnosticListener @this, CapReceivedMessage message, [CallerMemberName] string operation = "") + { + if (@this.IsEnabled(CapBeforeReceiveMessageStore)) + { + Guid operationId = Guid.NewGuid(); + + @this.Write(CapBeforeReceiveMessageStore, new + { + OperationId = operationId, + Operation = operation, + MessageName = message.Name, + MessageContent = message.Content + }); + + return operationId; + } + return Guid.Empty; + } + + public static void WriteReceiveMessageStoreAfter(this DiagnosticListener @this, Guid operationId, CapReceivedMessage message, [CallerMemberName] string operation = "") + { + if (@this.IsEnabled(CapAfterReceiveMessageStore)) + { + @this.Write(CapAfterReceiveMessageStore, new + { + OperationId = operationId, + Operation = operation, + MessageId = message.Id, + MessageName = message.Name, + MessageContent = message.Content, + Timestamp = Stopwatch.GetTimestamp() + }); + } + } + + public static void WriteReceiveMessageStoreError(this DiagnosticListener @this, Guid operationId, + CapReceivedMessage message, Exception ex, [CallerMemberName] string operation = "") + { + if (@this.IsEnabled(CapErrorReceiveMessageStore)) + { + @this.Write(CapErrorReceiveMessageStore, new + { + OperationId = operationId, + Operation = operation, + MessageId = message.Id, + MessageName = message.Name, + MessageContent = message.Content, + Exception = ex, + Timestamp = Stopwatch.GetTimestamp() + }); + } + } + + public static Guid WriteConsumerInvokeBefore(this DiagnosticListener @this, ConsumerContext context, [CallerMemberName] string operation = "") + { + if (@this.IsEnabled(CapBeforeConsumerInvoke)) + { + Guid operationId = Guid.NewGuid(); + + @this.Write(CapBeforeConsumerInvoke, new + { + OperationId = operationId, + Operation = operation, + MethodName = context.ConsumerDescriptor.MethodInfo.Name, + ConsumerGroup = context.ConsumerDescriptor.Attribute.Group, + MessageName = context.DeliverMessage.Name, + MessageContent = context.DeliverMessage.Content, + Timestamp = Stopwatch.GetTimestamp() + }); + + return operationId; + } + return Guid.Empty; + } + + public static void WriteConsumerInvokeAfter(this DiagnosticListener @this, Guid operationId, ConsumerContext context, [CallerMemberName] string operation = "") + { + if (@this.IsEnabled(CapAfterConsumerInvoke)) + { + @this.Write(CapAfterConsumerInvoke, new + { + OperationId = operationId, + Operation = operation, + MethodName = context.ConsumerDescriptor.MethodInfo.Name, + ConsumerGroup = context.ConsumerDescriptor.Attribute.Group, + MessageName = context.DeliverMessage.Name, + MessageContent = context.DeliverMessage.Content, + Timestamp = Stopwatch.GetTimestamp() + }); + } + } + + public static void WriteConsumerInvokeError(this DiagnosticListener @this, Guid operationId, + ConsumerContext context, Exception ex, [CallerMemberName] string operation = "") + { + if (@this.IsEnabled(CapErrorConsumerInvoke)) + { + @this.Write(CapErrorConsumerInvoke, new + { + OperationId = operationId, + Operation = operation, + MethodName = context.ConsumerDescriptor.MethodInfo.Name, + ConsumerGroup = context.ConsumerDescriptor.Attribute.Group, + MessageName = context.DeliverMessage.Name, + MessageContent = context.DeliverMessage.Content, + Exception = ex, + Timestamp = Stopwatch.GetTimestamp() + }); + } + } + } +}