Browse Source

support event data for Diagnostics.

master
Savorboard 6 years ago
parent
commit
621179df38
6 changed files with 336 additions and 36 deletions
  1. +23
    -12
      src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
  2. +1
    -0
      src/DotNetCore.CAP/DotNetCore.CAP.csproj
  3. +28
    -12
      src/DotNetCore.CAP/IConsumerHandler.Default.cs
  4. +19
    -9
      src/DotNetCore.CAP/IPublishMessageSender.Base.cs
  5. +15
    -3
      src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
  6. +250
    -0
      src/DotNetCore.CAP/Internal/CapDiagnosticListenerExtensions.cs

+ 23
- 12
src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs View File

@@ -3,8 +3,10 @@


using System; using System;
using System.Data; using System.Data;
using System.Diagnostics;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;


@@ -15,6 +17,11 @@ namespace DotNetCore.CAP.Abstractions
private readonly IDispatcher _dispatcher; private readonly IDispatcher _dispatcher;
private readonly ILogger _logger; private readonly ILogger _logger;


// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);

protected CapPublisherBase(ILogger<CapPublisherBase> logger, IDispatcher dispatcher) protected CapPublisherBase(ILogger<CapPublisherBase> logger, IDispatcher dispatcher)
{ {
_logger = logger; _logger = logger;
@@ -75,13 +82,13 @@ namespace DotNetCore.CAP.Abstractions


protected virtual string Serialize<T>(T obj, string callbackName = null) protected virtual string Serialize<T>(T obj, string callbackName = null)
{ {
var packer = (IMessagePacker) ServiceProvider.GetService(typeof(IMessagePacker));
var packer = (IMessagePacker)ServiceProvider.GetService(typeof(IMessagePacker));
string content; string content;
if (obj != null) if (obj != null)
{ {
if (Helper.IsComplexType(obj.GetType())) if (Helper.IsComplexType(obj.GetType()))
{ {
var serializer = (IContentSerializer) ServiceProvider.GetService(typeof(IContentSerializer));
var serializer = (IContentSerializer)ServiceProvider.GetService(typeof(IContentSerializer));
content = serializer.Serialize(obj); content = serializer.Serialize(obj);
} }
else else
@@ -179,16 +186,20 @@ namespace DotNetCore.CAP.Abstractions


private void PublishWithTrans<T>(string name, T contentObj, string callbackName = null) private void PublishWithTrans<T>(string name, T contentObj, string callbackName = null)
{ {
try
Guid operationId = default(Guid);

var content = Serialize(contentObj, callbackName);

var message = new CapPublishedMessage
{ {
var content = Serialize(contentObj, callbackName);
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};


var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);


var id = Execute(DbConnection, DbTransaction, message); var id = Execute(DbConnection, DbTransaction, message);


@@ -197,15 +208,15 @@ namespace DotNetCore.CAP.Abstractions
if (id > 0) if (id > 0)
{ {
_logger.LogInformation($"message [{message}] has been persisted in the database."); _logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
message.Id = id; message.Id = id;

Enqueue(message); Enqueue(message);
} }
} }
catch (Exception e) catch (Exception e)
{ {
_logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e); _logger.LogError("An exception was occurred when publish message. exception message:" + e.Message, e);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e); Console.WriteLine(e);
throw; throw;
} }


+ 1
- 0
src/DotNetCore.CAP/DotNetCore.CAP.csproj View File

@@ -55,6 +55,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.0.1" /> <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.0.1" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> <PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="System.Data.Common" Version="4.3.0" /> <PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.4.1" />
<PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" /> <PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.0.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" />


+ 28
- 12
src/DotNetCore.CAP/IConsumerHandler.Default.cs View File

@@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.


using System; using System;
using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@@ -25,6 +26,11 @@ namespace DotNetCore.CAP
private Task _compositeTask; private Task _compositeTask;
private bool _disposed; private bool _disposed;


// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);

public ConsumerHandler(IConsumerClientFactory consumerClientFactory, public ConsumerHandler(IConsumerClientFactory consumerClientFactory,
IDispatcher dispatcher, IDispatcher dispatcher,
IStorageConnection connection, IStorageConnection connection,
@@ -91,21 +97,34 @@ namespace DotNetCore.CAP


private void RegisterMessageProcessor(IConsumerClient client) private void RegisterMessageProcessor(IConsumerClient client)
{ {
client.OnMessageReceived += (sender, message) =>
client.OnMessageReceived += (sender, messageContext) =>
{ {
Guid operationId = default(Guid);

var receivedMessage = new CapReceivedMessage(messageContext)
{
StatusName = StatusName.Scheduled
};

try try
{ {
var storedMessage = StoreMessage(message);
operationId = s_diagnosticListener.WriteReceiveMessageStoreBefore(receivedMessage);

StoreMessage(receivedMessage);


client.Commit(); client.Commit();


_dispatcher.EnqueueToExecute(storedMessage);
s_diagnosticListener.WriteReceiveMessageStoreAfter(operationId, receivedMessage);

_dispatcher.EnqueueToExecute(receivedMessage);
} }
catch (Exception e) catch (Exception e)
{ {
_logger.LogError(e, "An exception occurred when storage received message. Message:'{0}'.",
message);
_logger.LogError(e, "An exception occurred when storage received message. Message:'{0}'.", messageContext);
client.Reject(); client.Reject();

s_diagnosticListener.WriteReceiveMessageStoreError(operationId, receivedMessage, e);
} }
}; };


@@ -139,15 +158,12 @@ namespace DotNetCore.CAP
} }
} }


private CapReceivedMessage StoreMessage(MessageContext messageContext)
private void StoreMessage(CapReceivedMessage receivedMessage)
{ {
var receivedMessage = new CapReceivedMessage(messageContext)
{
StatusName = StatusName.Scheduled
};
var id = _connection.StoreReceivedMessageAsync(receivedMessage).GetAwaiter().GetResult();
var id = _connection.StoreReceivedMessageAsync(receivedMessage)
.GetAwaiter().GetResult();

receivedMessage.Id = id; receivedMessage.Id = id;
return receivedMessage;
} }
} }
} }

+ 19
- 9
src/DotNetCore.CAP/IPublishMessageSender.Base.cs View File

@@ -20,6 +20,11 @@ namespace DotNetCore.CAP
private readonly CapOptions _options; private readonly CapOptions _options;
private readonly IStateChanger _stateChanger; private readonly IStateChanger _stateChanger;


// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);

protected BasePublishMessageSender( protected BasePublishMessageSender(
ILogger logger, ILogger logger,
CapOptions options, CapOptions options,
@@ -37,6 +42,7 @@ namespace DotNetCore.CAP
public async Task<OperateResult> SendAsync(CapPublishedMessage message) public async Task<OperateResult> SendAsync(CapPublishedMessage message)
{ {
var sp = Stopwatch.StartNew(); var sp = Stopwatch.StartNew();
var operationId = s_diagnosticListener.WritePublishBefore(message);


var result = await PublishAsync(message.Name, message.Content); var result = await PublishAsync(message.Name, message.Content);


@@ -45,22 +51,26 @@ namespace DotNetCore.CAP
if (result.Succeeded) if (result.Succeeded)
{ {
await SetSuccessfulState(message); await SetSuccessfulState(message);

s_diagnosticListener.WritePublishAfter(operationId, message);
_logger.MessageHasBeenSent(sp.Elapsed.TotalSeconds); _logger.MessageHasBeenSent(sp.Elapsed.TotalSeconds);


return OperateResult.Success; return OperateResult.Success;
} }

_logger.MessagePublishException(message.Id, result.Exception);

await SetFailedState(message, result.Exception, out bool stillRetry);

if (stillRetry)
else
{ {
_logger.SenderRetrying(3);
s_diagnosticListener.WritePublishError(operationId, message, result.Exception);
_logger.MessagePublishException(message.Id, result.Exception);


await SendAsync(message);
await SetFailedState(message, result.Exception, out bool stillRetry);
if (stillRetry)
{
_logger.SenderRetrying(3);

await SendAsync(message);
}
return OperateResult.Failed(result.Exception);
} }
return OperateResult.Failed(result.Exception);
} }


private static bool UpdateMessageForRetryAsync(CapPublishedMessage message) private static bool UpdateMessageForRetryAsync(CapPublishedMessage message)


+ 15
- 3
src/DotNetCore.CAP/ISubscribeExecutor.Default.cs View File

@@ -18,10 +18,14 @@ namespace DotNetCore.CAP
private readonly ICallbackMessageSender _callbackMessageSender; private readonly ICallbackMessageSender _callbackMessageSender;
private readonly IStorageConnection _connection; private readonly IStorageConnection _connection;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IStateChanger _stateChanger;
private readonly CapOptions _options; private readonly CapOptions _options;

private readonly MethodMatcherCache _selector; private readonly MethodMatcherCache _selector;
private readonly IStateChanger _stateChanger;

// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);


public DefaultSubscriberExecutor( public DefaultSubscriberExecutor(
ILogger<DefaultSubscriberExecutor> logger, ILogger<DefaultSubscriberExecutor> logger,
@@ -139,12 +143,18 @@ namespace DotNetCore.CAP
var error = $"message can not be found subscriber, Message:{receivedMessage},\r\n see: https://github.com/dotnetcore/CAP/issues/63"; var error = $"message can not be found subscriber, Message:{receivedMessage},\r\n see: https://github.com/dotnetcore/CAP/issues/63";
throw new SubscriberNotFoundException(error); throw new SubscriberNotFoundException(error);
} }

Guid operationId = default(Guid);
var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext());

try try
{ {
var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext());
operationId = s_diagnosticListener.WriteConsumerInvokeBefore(consumerContext);


var ret = await Invoker.InvokeAsync(consumerContext); var ret = await Invoker.InvokeAsync(consumerContext);


s_diagnosticListener.WriteConsumerInvokeAfter(operationId,consumerContext);

if (!string.IsNullOrEmpty(ret.CallbackName)) if (!string.IsNullOrEmpty(ret.CallbackName))
{ {
await _callbackMessageSender.SendAsync(ret.MessageId, ret.CallbackName, ret.Result); await _callbackMessageSender.SendAsync(ret.MessageId, ret.CallbackName, ret.Result);
@@ -152,6 +162,8 @@ namespace DotNetCore.CAP
} }
catch (Exception ex) catch (Exception ex)
{ {
s_diagnosticListener.WriteConsumerInvokeError(operationId, consumerContext, ex);

throw new SubscriberExecutionFailedException(ex.Message, ex); throw new SubscriberExecutionFailedException(ex.Message, ex);
} }
} }


+ 250
- 0
src/DotNetCore.CAP/Internal/CapDiagnosticListenerExtensions.cs View File

@@ -0,0 +1,250 @@
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.Internal
{
/// <summary>
/// Extension methods on the DiagnosticListener class to log CAP data
/// </summary>
internal static class CapDiagnosticListenerExtensions
{
public const string DiagnosticListenerName = "CapDiagnosticListener";

private const string CapPrefix = "DotNetCore.CAP.";

public const string CapBeforePublishMessageStore = CapPrefix + nameof(WritePublishMessageStoreBefore);
public const string CapAfterPublishMessageStore = CapPrefix + nameof(WritePublishMessageStoreAfter);
public const string CapErrorPublishMessageStore = CapPrefix + nameof(WritePublishMessageStoreError);

public const string CapBeforePublish = CapPrefix + nameof(WritePublishBefore);
public const string CapAfterPublish = CapPrefix + nameof(WritePublishAfter);
public const string CapErrorPublish = CapPrefix + nameof(WritePublishError);

public const string CapBeforeReceiveMessageStore = CapPrefix + nameof(WriteReceiveMessageStoreBefore);
public const string CapAfterReceiveMessageStore = CapPrefix + nameof(WriteReceiveMessageStoreAfter);
public const string CapErrorReceiveMessageStore = CapPrefix + nameof(WriteReceiveMessageStoreError);

public const string CapBeforeConsumerInvoke = CapPrefix + nameof(WriteConsumerInvokeBefore);
public const string CapAfterConsumerInvoke = CapPrefix + nameof(WriteConsumerInvokeAfter);
public const string CapErrorConsumerInvoke = CapPrefix + nameof(WriteConsumerInvokeError);

public static Guid WritePublishMessageStoreBefore(this DiagnosticListener @this, CapPublishedMessage message, [CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapBeforePublishMessageStore))
{
Guid operationId = Guid.NewGuid();

@this.Write(CapBeforePublishMessageStore, new
{
OperationId = operationId,
Operation = operation,
MessageName = message.Name,
MessageContent = message.Content
});

return operationId;
}
return Guid.Empty;
}

public static void WritePublishMessageStoreAfter(this DiagnosticListener @this, Guid operationId, CapPublishedMessage message, [CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapAfterPublishMessageStore))
{
@this.Write(CapAfterPublishMessageStore, new
{
OperationId = operationId,
Operation = operation,
MessageId = message.Id,
MessageName = message.Name,
MessageContent = message.Content,
Timestamp = Stopwatch.GetTimestamp()
});
}
}

public static void WritePublishMessageStoreError(this DiagnosticListener @this, Guid operationId,
CapPublishedMessage message, Exception ex, [CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapErrorPublishMessageStore))
{
@this.Write(CapErrorPublishMessageStore, new
{
OperationId = operationId,
Operation = operation,
MessageName = message.Name,
MessageContent = message.Content,
Exception = ex,
Timestamp = Stopwatch.GetTimestamp()
});
}
}

public static Guid WritePublishBefore(this DiagnosticListener @this, CapPublishedMessage message, [CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapBeforePublish))
{
Guid operationId = Guid.NewGuid();

@this.Write(CapBeforePublish, new
{
OperationId = operationId,
Operation = operation,
MessageId = message.Id,
MessageName = message.Name,
MessageContent = message.Content
});

return operationId;
}
return Guid.Empty;
}

public static void WritePublishAfter(this DiagnosticListener @this, Guid operationId, CapPublishedMessage message, [CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapAfterPublish))
{
@this.Write(CapAfterPublish, new
{
OperationId = operationId,
Operation = operation,
MessageId = message.Id,
MessageName = message.Name,
MessageContent = message.Content,
Timestamp = Stopwatch.GetTimestamp()
});
}
}

public static void WritePublishError(this DiagnosticListener @this, Guid operationId,
CapPublishedMessage message, Exception ex, [CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapErrorPublish))
{
@this.Write(CapErrorPublish, new
{
OperationId = operationId,
Operation = operation,
MessageId = message.Id,
MessageName = message.Name,
MessageContent = message.Content,
Exception = ex,
Timestamp = Stopwatch.GetTimestamp()
});
}
}

public static Guid WriteReceiveMessageStoreBefore(this DiagnosticListener @this, CapReceivedMessage message, [CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapBeforeReceiveMessageStore))
{
Guid operationId = Guid.NewGuid();

@this.Write(CapBeforeReceiveMessageStore, new
{
OperationId = operationId,
Operation = operation,
MessageName = message.Name,
MessageContent = message.Content
});

return operationId;
}
return Guid.Empty;
}

public static void WriteReceiveMessageStoreAfter(this DiagnosticListener @this, Guid operationId, CapReceivedMessage message, [CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapAfterReceiveMessageStore))
{
@this.Write(CapAfterReceiveMessageStore, new
{
OperationId = operationId,
Operation = operation,
MessageId = message.Id,
MessageName = message.Name,
MessageContent = message.Content,
Timestamp = Stopwatch.GetTimestamp()
});
}
}

public static void WriteReceiveMessageStoreError(this DiagnosticListener @this, Guid operationId,
CapReceivedMessage message, Exception ex, [CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapErrorReceiveMessageStore))
{
@this.Write(CapErrorReceiveMessageStore, new
{
OperationId = operationId,
Operation = operation,
MessageId = message.Id,
MessageName = message.Name,
MessageContent = message.Content,
Exception = ex,
Timestamp = Stopwatch.GetTimestamp()
});
}
}

public static Guid WriteConsumerInvokeBefore(this DiagnosticListener @this, ConsumerContext context, [CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapBeforeConsumerInvoke))
{
Guid operationId = Guid.NewGuid();

@this.Write(CapBeforeConsumerInvoke, new
{
OperationId = operationId,
Operation = operation,
MethodName = context.ConsumerDescriptor.MethodInfo.Name,
ConsumerGroup = context.ConsumerDescriptor.Attribute.Group,
MessageName = context.DeliverMessage.Name,
MessageContent = context.DeliverMessage.Content,
Timestamp = Stopwatch.GetTimestamp()
});

return operationId;
}
return Guid.Empty;
}

public static void WriteConsumerInvokeAfter(this DiagnosticListener @this, Guid operationId, ConsumerContext context, [CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapAfterConsumerInvoke))
{
@this.Write(CapAfterConsumerInvoke, new
{
OperationId = operationId,
Operation = operation,
MethodName = context.ConsumerDescriptor.MethodInfo.Name,
ConsumerGroup = context.ConsumerDescriptor.Attribute.Group,
MessageName = context.DeliverMessage.Name,
MessageContent = context.DeliverMessage.Content,
Timestamp = Stopwatch.GetTimestamp()
});
}
}

public static void WriteConsumerInvokeError(this DiagnosticListener @this, Guid operationId,
ConsumerContext context, Exception ex, [CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapErrorConsumerInvoke))
{
@this.Write(CapErrorConsumerInvoke, new
{
OperationId = operationId,
Operation = operation,
MethodName = context.ConsumerDescriptor.MethodInfo.Name,
ConsumerGroup = context.ConsumerDescriptor.Attribute.Group,
MessageName = context.DeliverMessage.Name,
MessageContent = context.DeliverMessage.Content,
Exception = ex,
Timestamp = Stopwatch.GetTimestamp()
});
}
}
}
}

Loading…
Cancel
Save