瀏覽代碼

Refacing diagnostics tracing module.

master
Savorboard 5 年之前
父節點
當前提交
fa9b4a3b05
共有 22 個文件被更改,包括 339 次插入617 次删除
  1. +2
    -0
      CAP.sln
  2. +31
    -0
      src/DotNetCore.CAP/Diagnostics/CapDiagnosticListenerNames.cs
  3. +0
    -229
      src/DotNetCore.CAP/Diagnostics/DiagnosticListenerExtensions.cs
  4. +0
    -27
      src/DotNetCore.CAP/Diagnostics/EventData.Broker.Consume.cs
  5. +0
    -19
      src/DotNetCore.CAP/Diagnostics/EventData.Broker.ConsumeEnd.cs
  6. +0
    -27
      src/DotNetCore.CAP/Diagnostics/EventData.Broker.ConsumeError.cs
  7. +0
    -20
      src/DotNetCore.CAP/Diagnostics/EventData.Broker.Publish.cs
  8. +0
    -20
      src/DotNetCore.CAP/Diagnostics/EventData.Broker.PublishEnd.cs
  9. +0
    -20
      src/DotNetCore.CAP/Diagnostics/EventData.Broker.PublishError.cs
  10. +0
    -23
      src/DotNetCore.CAP/Diagnostics/EventData.Broker.cs
  11. +33
    -0
      src/DotNetCore.CAP/Diagnostics/EventData.Cap.P.cs
  12. +41
    -0
      src/DotNetCore.CAP/Diagnostics/EventData.Cap.S.cs
  13. +0
    -35
      src/DotNetCore.CAP/Diagnostics/EventData.SubscriberInvoke.cs
  14. +0
    -20
      src/DotNetCore.CAP/Diagnostics/EventData.SubscriberInvokeEnd.cs
  15. +0
    -20
      src/DotNetCore.CAP/Diagnostics/EventData.SubscriberInvokeError.cs
  16. +0
    -20
      src/DotNetCore.CAP/Diagnostics/EventData.cs
  17. +0
    -12
      src/DotNetCore.CAP/Diagnostics/IErrorEventData.cs
  18. +0
    -44
      src/DotNetCore.CAP/Diagnostics/TracingHeaders.cs
  19. +65
    -6
      src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs
  20. +51
    -33
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  21. +48
    -32
      src/DotNetCore.CAP/Internal/IMessageSender.Default.cs
  22. +68
    -10
      src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs

+ 2
- 0
CAP.sln 查看文件

@@ -128,9 +128,11 @@ Global
{77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{77C0AC02-C44B-49D5-B969-7D5305FC20A5}.Release|Any CPU.Build.0 = Release|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F}.Release|Any CPU.Build.0 = Release|Any CPU
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.Build.0 = Release|Any CPU
{63B2A464-FBEA-42FB-8EFA-98AFA39FC920}.Debug|Any CPU.ActiveCfg = Debug|Any CPU


+ 31
- 0
src/DotNetCore.CAP/Diagnostics/CapDiagnosticListenerNames.cs 查看文件

@@ -0,0 +1,31 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

namespace DotNetCore.CAP.Diagnostics
{
/// <summary>
/// Extension methods on the DiagnosticListener class to log CAP data
/// </summary>
public static class CapDiagnosticListenerNames
{
public const string DiagnosticListenerName = "CapDiagnosticListener";

private const string CapPrefix = "DotNetCore.CAP.";

public const string BeforePublishMessageStore = CapPrefix + "WritePublishMessageStoreBefore";
public const string AfterPublishMessageStore = CapPrefix + "WritePublishMessageStoreAfter";
public const string ErrorPublishMessageStore = CapPrefix + "WritePublishMessageStoreError";

public const string BeforePublish = CapPrefix + "WritePublishBefore";
public const string AfterPublish = CapPrefix + "WritePublishAfter";
public const string ErrorPublish = CapPrefix + "WritePublishError";

public const string BeforeConsume = CapPrefix + "WriteConsumeBefore";
public const string AfterConsume = CapPrefix + "WriteConsumeAfter";
public const string ErrorConsume = CapPrefix + "WriteConsumeError";

public const string BeforeSubscriberInvoke = CapPrefix + "WriteSubscriberInvokeBefore";
public const string AfterSubscriberInvoke = CapPrefix + "WriteSubscriberInvokeAfter";
public const string ErrorSubscriberInvoke = CapPrefix + "WriteSubscriberInvokeError";
}
}

+ 0
- 229
src/DotNetCore.CAP/Diagnostics/DiagnosticListenerExtensions.cs 查看文件

@@ -1,229 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using DotNetCore.CAP.Messages;

namespace DotNetCore.CAP.Diagnostics
{
/// <summary>
/// Extension methods on the DiagnosticListener class to log CAP data
/// </summary>
public static class CapDiagnosticListenerExtensions
{
public const string DiagnosticListenerName = "CapDiagnosticListener";

private const string CapPrefix = "DotNetCore.CAP.";

public const string CapBeforePublishMessageStore = CapPrefix + "WritePublishMessageStoreBefore";
public const string CapAfterPublishMessageStore = CapPrefix + "WritePublishMessageStoreAfter";
public const string CapErrorPublishMessageStore = CapPrefix + "WritePublishMessageStoreError";

public const string CapBeforePublish = CapPrefix + "WritePublishBefore";
public const string CapAfterPublish = CapPrefix + "WritePublishAfter";
public const string CapErrorPublish = CapPrefix + "WritePublishError";

public const string CapBeforeConsume = CapPrefix + "WriteConsumeBefore";
public const string CapAfterConsume = CapPrefix + "WriteConsumeAfter";
public const string CapErrorConsume = CapPrefix + "WriteConsumeError";

public const string CapBeforeSubscriberInvoke = CapPrefix + "WriteSubscriberInvokeBefore";
public const string CapAfterSubscriberInvoke = CapPrefix + "WriteSubscriberInvokeAfter";
public const string CapErrorSubscriberInvoke = CapPrefix + "WriteSubscriberInvokeError";


//============================================================================
//==================== Before publish store message ====================
//============================================================================
public static Guid WritePublishMessageStoreBefore(this DiagnosticListener @this,
Message message,
[CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapBeforePublishMessageStore))
{
var operationId = Guid.NewGuid();

@this.Write(CapBeforePublishMessageStore, new
{
OperationId = operationId,
Operation = operation,
Message = message
});

return operationId;
}

return Guid.Empty;
}

public static void WritePublishMessageStoreAfter(this DiagnosticListener @this,
Guid operationId,
Message message,
[CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapAfterPublishMessageStore))
{
@this.Write(CapAfterPublishMessageStore, new
{
OperationId = operationId,
Operation = operation,
Message = message,
Timestamp = Stopwatch.GetTimestamp()
});
}
}

public static void WritePublishMessageStoreError(this DiagnosticListener @this,
Guid operationId,
Message message,
Exception ex,
[CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapErrorPublishMessageStore))
{
@this.Write(CapErrorPublishMessageStore, new
{
OperationId = operationId,
Operation = operation,
Message = message,
Exception = ex,
Timestamp = Stopwatch.GetTimestamp()
});
}
}


////============================================================================
////==================== Publish ====================
////============================================================================
//public static void WritePublishBefore(this DiagnosticListener @this, BrokerPublishEventData eventData)
//{
// if (@this.IsEnabled(CapBeforePublish))
// {
// eventData.Headers = new TracingHeaders();
// @this.Write(CapBeforePublish, eventData);
// }
//}

//public static void WritePublishAfter(this DiagnosticListener @this, BrokerPublishEndEventData eventData)
//{
// if (@this.IsEnabled(CapAfterPublish))
// {
// eventData.Headers = new TracingHeaders();
// @this.Write(CapAfterPublish, eventData);
// }
//}

//public static void WritePublishError(this DiagnosticListener @this, BrokerPublishErrorEventData eventData)
//{
// if (@this.IsEnabled(CapErrorPublish))
// {
// eventData.Headers = new TracingHeaders();
// @this.Write(CapErrorPublish, eventData);
// }
//}


//============================================================================
//==================== Consume ====================
//============================================================================
//public static Guid WriteConsumeBefore(this DiagnosticListener @this, BrokerConsumeEventData eventData)
//{
// if (@this.IsEnabled(CapBeforeConsume))
// {
// eventData.Headers = new TracingHeaders();
// @this.Write(CapBeforeConsume, eventData);
// }

// return Guid.Empty;
//}

//public static void WriteConsumeAfter(this DiagnosticListener @this, BrokerConsumeEndEventData eventData)
//{
// if (@this.IsEnabled(CapAfterConsume))
// {
// eventData.Headers = new TracingHeaders();
// @this.Write(CapAfterConsume, eventData);
// }
//}

//public static void WriteConsumeError(this DiagnosticListener @this, BrokerConsumeErrorEventData eventData)
//{
// if (@this.IsEnabled(CapErrorConsume))
// {
// eventData.Headers = new TracingHeaders();
// @this.Write(CapErrorConsume, eventData);
// }
//}


//============================================================================
//==================== SubscriberInvoke ====================
//============================================================================
//public static Guid WriteSubscriberInvokeBefore(this DiagnosticListener @this,
// ConsumerContext context,
// [CallerMemberName] string operation = "")
//{
// if (@this.IsEnabled(CapBeforeSubscriberInvoke))
// {
// var operationId = Guid.NewGuid();

// var methodName = context.ConsumerDescriptor.MethodInfo.Name;
// var subscribeName = context.ConsumerDescriptor.Attribute.Name;
// var subscribeGroup = context.ConsumerDescriptor.Attribute.Group;
// var values = context.DeliverMessage.Value;

// @this.Write(CapBeforeSubscriberInvoke, new SubscriberInvokeEventData(operationId, operation, methodName,
// subscribeName,
// subscribeGroup, parameterValues, DateTimeOffset.UtcNow));

// return operationId;
// }

// return Guid.Empty;
//}

//public static void WriteSubscriberInvokeAfter(this DiagnosticListener @this,
// Guid operationId,
// ConsumerContext context,
// DateTimeOffset startTime,
// TimeSpan duration,
// [CallerMemberName] string operation = "")
//{
// if (@this.IsEnabled(CapAfterSubscriberInvoke))
// {
// var methodName = context.ConsumerDescriptor.MethodInfo.Name;
// var subscribeName = context.ConsumerDescriptor.Attribute.Name;
// var subscribeGroup = context.ConsumerDescriptor.Attribute.Group;
// var values = context.DeliverMessage.Value;

// @this.Write(CapAfterSubscriberInvoke, new SubscriberInvokeEndEventData(operationId, operation, methodName,
// subscribeName,
// subscribeGroup, parameterValues, startTime, duration));
// }
//}

//public static void WriteSubscriberInvokeError(this DiagnosticListener @this,
// Guid operationId,
// ConsumerContext context,
// Exception ex,
// DateTimeOffset startTime,
// TimeSpan duration,
// [CallerMemberName] string operation = "")
//{
// if (@this.IsEnabled(CapErrorSubscriberInvoke))
// {
// var methodName = context.ConsumerDescriptor.MethodInfo.Name;
// var subscribeName = context.ConsumerDescriptor.Attribute.Name;
// var subscribeGroup = context.ConsumerDescriptor.Attribute.Group;
// var parameterValues = context.DeliverMessage.Content;

// @this.Write(CapErrorSubscriberInvoke, new SubscriberInvokeErrorEventData(operationId, operation, methodName,
// subscribeName,
// subscribeGroup, parameterValues, ex, startTime, duration));
// }
//}
}
}

+ 0
- 27
src/DotNetCore.CAP/Diagnostics/EventData.Broker.Consume.cs 查看文件

@@ -1,27 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.Messages;

namespace DotNetCore.CAP.Diagnostics
{
public class BrokerConsumeEventData
{
public BrokerConsumeEventData(Guid operationId,string brokerAddress, TransportMessage message, DateTimeOffset startTime)
{
OperationId = operationId;
StartTime = startTime;
BrokerAddress = brokerAddress;
Message = message;
}

public Guid OperationId { get; set; }

public string BrokerAddress { get; set; }

public TransportMessage Message { get; set; }

public DateTimeOffset StartTime { get; }
}
}

+ 0
- 19
src/DotNetCore.CAP/Diagnostics/EventData.Broker.ConsumeEnd.cs 查看文件

@@ -1,19 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.Messages;

namespace DotNetCore.CAP.Diagnostics
{
public class BrokerConsumeEndEventData : BrokerConsumeEventData
{
public BrokerConsumeEndEventData(Guid operationId, string operation, string brokerAddress, TransportMessage message, DateTimeOffset startTime, TimeSpan duration)
: base(operationId, brokerAddress, message, startTime)
{
Duration = duration;
}

public TimeSpan Duration { get; }
}
}

+ 0
- 27
src/DotNetCore.CAP/Diagnostics/EventData.Broker.ConsumeError.cs 查看文件

@@ -1,27 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.Messages;

namespace DotNetCore.CAP.Diagnostics
{
public class BrokerConsumeErrorEventData : IErrorEventData
{
public BrokerConsumeErrorEventData(Guid operationId, string brokerAddress, TransportMessage message, Exception exception)
{
OperationId = operationId;
BrokerAddress = brokerAddress;
Message = message;
Exception = exception;
}

public Guid OperationId { get; set; }

public string BrokerAddress { get; }

public TransportMessage Message { get; }

public Exception Exception { get; }
}
}

+ 0
- 20
src/DotNetCore.CAP/Diagnostics/EventData.Broker.Publish.cs 查看文件

@@ -1,20 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.Messages;

namespace DotNetCore.CAP.Diagnostics
{
public class BrokerPublishEventData : BrokerEventData
{
public BrokerPublishEventData(Guid operationId, string operation, string brokerAddress,
Message message , DateTimeOffset startTime)
: base(operationId, operation, brokerAddress, message)
{
StartTime = startTime;
}

public DateTimeOffset StartTime { get; }
}
}

+ 0
- 20
src/DotNetCore.CAP/Diagnostics/EventData.Broker.PublishEnd.cs 查看文件

@@ -1,20 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.Messages;

namespace DotNetCore.CAP.Diagnostics
{
public class BrokerPublishEndEventData : BrokerPublishEventData
{
public BrokerPublishEndEventData(Guid operationId, string operation, string brokerAddress,
Message message, DateTimeOffset startTime, TimeSpan duration)
: base(operationId, operation, brokerAddress, message, startTime)
{
Duration = duration;
}

public TimeSpan Duration { get; }
}
}

+ 0
- 20
src/DotNetCore.CAP/Diagnostics/EventData.Broker.PublishError.cs 查看文件

@@ -1,20 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.Messages;

namespace DotNetCore.CAP.Diagnostics
{
public class BrokerPublishErrorEventData : BrokerPublishEndEventData, IErrorEventData
{
public BrokerPublishErrorEventData(Guid operationId, string operation, string brokerAddress,
Message message, Exception exception, DateTimeOffset startTime, TimeSpan duration)
: base(operationId, operation, brokerAddress, message, startTime, duration)
{
Exception = exception;
}

public Exception Exception { get; }
}
}

+ 0
- 23
src/DotNetCore.CAP/Diagnostics/EventData.Broker.cs 查看文件

@@ -1,23 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.Messages;

namespace DotNetCore.CAP.Diagnostics
{
public class BrokerEventData : EventData
{
public BrokerEventData(Guid operationId, string operation, string brokerAddress, Message message)
: base(operationId, operation)
{
BrokerAddress = brokerAddress;

Message = message;
}

public string BrokerAddress { get; set; }

public Message Message { get; set; }
}
}

+ 33
- 0
src/DotNetCore.CAP/Diagnostics/EventData.Cap.P.cs 查看文件

@@ -0,0 +1,33 @@
using System;
using DotNetCore.CAP.Messages;

namespace DotNetCore.CAP.Diagnostics
{
public class CapEventDataPubStore
{
public long? OperationTimestamp { get; set; }

public string Operation { get; set; }

public Message Message { get; set; }

public long? ElapsedTimeMs { get; set; }

public Exception Exception { get; set; }
}

public class CapEventDataPubSend
{
public long? OperationTimestamp { get; set; }

public string Operation { get; set; }

public TransportMessage TransportMessage { get; set; }

public string BrokerAddress { get; set; }

public long? ElapsedTimeMs { get; set; }

public Exception Exception { get; set; }
}
}

+ 41
- 0
src/DotNetCore.CAP/Diagnostics/EventData.Cap.S.cs 查看文件

@@ -0,0 +1,41 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Reflection;
using DotNetCore.CAP.Messages;
using JetBrains.Annotations;

namespace DotNetCore.CAP.Diagnostics
{
public class CapEventDataSubStore
{
public long? OperationTimestamp { get; set; }

public string Operation { get; set; }

public TransportMessage TransportMessage { get; set; }

public string BrokerAddress { get; set; }

public long? ElapsedTimeMs { get; set; }

public Exception Exception { get; set; }
}

public class CapEventDataSubExecute
{
public long? OperationTimestamp { get; set; }

public string Operation { get; set; }

public Message Message { get; set; }

[CanBeNull]
public MethodInfo MethodInfo { get; set; }

public long? ElapsedTimeMs { get; set; }

public Exception Exception { get; set; }
}
}

+ 0
- 35
src/DotNetCore.CAP/Diagnostics/EventData.SubscriberInvoke.cs 查看文件

@@ -1,35 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;

namespace DotNetCore.CAP.Diagnostics
{
public class SubscriberInvokeEventData : EventData
{
public SubscriberInvokeEventData(Guid operationId,
string operation,
string methodName,
string subscribeName,
string subscribeGroup,
object values,
DateTimeOffset startTime)
: base(operationId, operation)
{
MethodName = methodName;
SubscribeName = subscribeName;
SubscribeGroup = subscribeGroup;
StartTime = startTime;
}

public DateTimeOffset StartTime { get; }

public string MethodName { get; set; }

public string SubscribeName { get; set; }

public string SubscribeGroup { get; set; }

public string Values { get; set; }
}
}

+ 0
- 20
src/DotNetCore.CAP/Diagnostics/EventData.SubscriberInvokeEnd.cs 查看文件

@@ -1,20 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;

namespace DotNetCore.CAP.Diagnostics
{
public class SubscriberInvokeEndEventData : SubscriberInvokeEventData
{
public SubscriberInvokeEndEventData(Guid operationId, string operation,
string methodName, string subscribeName, string subscribeGroup,
string parameterValues, DateTimeOffset startTime, TimeSpan duration)
: base(operationId, operation, methodName, subscribeName, subscribeGroup, parameterValues, startTime)
{
Duration = duration;
}

public TimeSpan Duration { get; }
}
}

+ 0
- 20
src/DotNetCore.CAP/Diagnostics/EventData.SubscriberInvokeError.cs 查看文件

@@ -1,20 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;

namespace DotNetCore.CAP.Diagnostics
{
public class SubscriberInvokeErrorEventData : SubscriberInvokeEndEventData, IErrorEventData
{
public SubscriberInvokeErrorEventData(Guid operationId, string operation, string methodName,
string subscribeName, string subscribeGroup, string parameterValues, Exception exception,
DateTimeOffset startTime, TimeSpan duration) : base(operationId, operation, methodName, subscribeName,
subscribeGroup, parameterValues, startTime, duration)
{
Exception = exception;
}

public Exception Exception { get; }
}
}

+ 0
- 20
src/DotNetCore.CAP/Diagnostics/EventData.cs 查看文件

@@ -1,20 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;

namespace DotNetCore.CAP.Diagnostics
{
public class EventData
{
public EventData(Guid operationId, string operation)
{
OperationId = operationId;
Operation = operation;
}

public Guid OperationId { get; set; }

public string Operation { get; set; }
}
}

+ 0
- 12
src/DotNetCore.CAP/Diagnostics/IErrorEventData.cs 查看文件

@@ -1,12 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;

namespace DotNetCore.CAP.Diagnostics
{
public interface IErrorEventData
{
Exception Exception { get; }
}
}

+ 0
- 44
src/DotNetCore.CAP/Diagnostics/TracingHeaders.cs 查看文件

@@ -1,44 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections;
using System.Collections.Generic;
using System.Linq;

namespace DotNetCore.CAP.Diagnostics
{
public class TracingHeaders : IEnumerable<KeyValuePair<string, string>>
{
private List<KeyValuePair<string, string>> _dataStore = new List<KeyValuePair<string, string>>();

public IEnumerator<KeyValuePair<string, string>> GetEnumerator()
{
return _dataStore.GetEnumerator();
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

public void Add(string name, string value)
{
_dataStore.Add(new KeyValuePair<string, string>(name, value));
}

public bool Contains(string name)
{
return _dataStore != null && _dataStore.Any(x => x.Key == name);
}

public void Remove(string name)
{
_dataStore?.RemoveAll(x => x.Key == name);
}

public void Cleaar()
{
_dataStore?.Clear();
}
}
}

+ 65
- 6
src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs 查看文件

@@ -21,7 +21,7 @@ namespace DotNetCore.CAP.Internal

// ReSharper disable once InconsistentNaming
protected static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName);

public CapPublisher(IServiceProvider service)
{
@@ -62,16 +62,16 @@ namespace DotNetCore.CAP.Internal

var message = new Message(optionHeaders, value);

var operationId = default(Guid);
long? tracingTimestamp = null;
try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
tracingTimestamp = TracingBefore(message);

if (Transaction.Value?.DbTransaction == null)
{
var mediumMessage = await _storage.StoreMessageAsync(name, message, cancellationToken: cancellationToken);

s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
TracingAfter(tracingTimestamp, message);

_dispatcher.EnqueueToPublish(mediumMessage);
}
@@ -81,7 +81,7 @@ namespace DotNetCore.CAP.Internal

var mediumMessage = await _storage.StoreMessageAsync(name, message, transaction.DbTransaction, cancellationToken);

s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
TracingAfter(tracingTimestamp, message);

transaction.AddToSent(mediumMessage);

@@ -93,7 +93,8 @@ namespace DotNetCore.CAP.Internal
}
catch (Exception e)
{
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
TracingError(tracingTimestamp, message, e);

throw;
}
}
@@ -113,5 +114,63 @@ namespace DotNetCore.CAP.Internal

return PublishAsync(name, value, header, cancellationToken);
}

#region tracing

private long? TracingBefore(Message message)
{
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.BeforePublishMessageStore))
{
var eventData = new CapEventDataPubStore()
{
OperationTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
Operation = message.GetName(),
Message = message
};

s_diagnosticListener.Write(CapDiagnosticListenerNames.BeforePublishMessageStore, eventData);

return eventData.OperationTimestamp;
}

return null;
}

private void TracingAfter(long? tracingTimestamp, Message message)
{
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.AfterPublishMessageStore))
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var eventData = new CapEventDataPubStore()
{
OperationTimestamp = now,
Operation = message.GetName(),
Message = message,
ElapsedTimeMs = now - tracingTimestamp.Value
};

s_diagnosticListener.Write(CapDiagnosticListenerNames.AfterPublishMessageStore, eventData);
}
}

private void TracingError(long? tracingTimestamp, Message message, Exception ex)
{
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.ErrorPublishMessageStore))
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var eventData = new CapEventDataPubStore()
{
OperationTimestamp = now,
Operation = message.GetName(),
Message = message,
ElapsedTimeMs = now - tracingTimestamp.Value,
Exception = ex
};

s_diagnosticListener.Write(CapDiagnosticListenerNames.ErrorPublishMessageStore, eventData);
}
}

#endregion
}
}

+ 51
- 33
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs 查看文件

@@ -37,7 +37,7 @@ namespace DotNetCore.CAP.Internal
// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName);

public ConsumerRegister(ILogger<ConsumerRegister> logger,
IOptions<CapOptions> options,
@@ -152,10 +152,10 @@ namespace DotNetCore.CAP.Internal
client.OnMessageReceived += async (sender, transportMessage) =>
{
_cts.Token.ThrowIfCancellationRequested();
Guid? operationId = null;
long? tracingTimestamp = null;
try
{
operationId = TracingBefore(transportMessage);
tracingTimestamp = TracingBefore(transportMessage, _serverAddress);

var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
@@ -191,10 +191,7 @@ namespace DotNetCore.CAP.Internal

client.Commit();

if (operationId != null)
{
TracingAfter(operationId.Value, message, startTime, stopwatch.Elapsed);
}
TracingAfter(tracingTimestamp, transportMessage, _serverAddress);
}
else
{
@@ -203,10 +200,7 @@ namespace DotNetCore.CAP.Internal

client.Commit();

if (operationId != null)
{
TracingAfter(operationId.Value, message, startTime, stopwatch.Elapsed);
}
TracingAfter(tracingTimestamp, transportMessage, _serverAddress);

_dispatcher.EnqueueToExecute(mediumMessage, executor);
}
@@ -217,10 +211,7 @@ namespace DotNetCore.CAP.Internal

client.Reject();

if (operationId != null)
{
TracingError(operationId.Value, transportMessage, e);
}
TracingError(tracingTimestamp, transportMessage, client.ServersAddress, e);
}
};

@@ -258,39 +249,66 @@ namespace DotNetCore.CAP.Internal
}
}

private Guid? TracingBefore(TransportMessage message)
#region tracing

private long? TracingBefore(TransportMessage message, string broker)
{
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerExtensions.CapBeforeConsume))
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.BeforeConsume))
{
var operationId = Guid.NewGuid();

var eventData = new BrokerConsumeEventData(operationId, _serverAddress, message, DateTimeOffset.UtcNow);
var eventData = new CapEventDataSubStore()
{
OperationTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
Operation = message.GetName(),
BrokerAddress = broker,
TransportMessage = message
};

s_diagnosticListener.Write(CapDiagnosticListenerExtensions.CapBeforeConsume, eventData);
s_diagnosticListener.Write(CapDiagnosticListenerNames.BeforeConsume, eventData);

return operationId;
return eventData.OperationTimestamp;
}

return null;
}

private void TracingAfter(Guid operationId, Message message, DateTimeOffset startTime, TimeSpan du)
private void TracingAfter(long? tracingTimestamp, TransportMessage message, string broker)
{
//if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerExtensions.CapAfterConsume))
//{
// var eventData = new BrokerConsumeEndEventData(operationId, "", _serverAddress, message, startTime, du);

// s_diagnosticListener.Write(CapDiagnosticListenerExtensions.CapAfterConsume, eventData);
//}
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.AfterConsume))
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var eventData = new CapEventDataSubStore()
{
OperationTimestamp = now,
Operation = message.GetName(),
BrokerAddress = broker,
TransportMessage = message,
ElapsedTimeMs = now - tracingTimestamp.Value
};

s_diagnosticListener.Write(CapDiagnosticListenerNames.AfterConsume, eventData);
}
}

private void TracingError(Guid operationId, TransportMessage message, Exception ex)
private void TracingError(long? tracingTimestamp, TransportMessage message, string broker, Exception ex)
{
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerExtensions.CapErrorConsume))
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.ErrorConsume))
{
var eventData = new BrokerConsumeErrorEventData(operationId, _serverAddress, message, ex);
s_diagnosticListener.Write(CapDiagnosticListenerExtensions.CapErrorConsume, eventData);
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();

var eventData = new CapEventDataPubSend()
{
OperationTimestamp = now,
Operation = message.GetName(),
BrokerAddress = broker,
TransportMessage = message,
ElapsedTimeMs = now - tracingTimestamp.Value,
Exception = ex
};

s_diagnosticListener.Write(CapDiagnosticListenerNames.ErrorConsume, eventData);
}
}

#endregion
}
}

+ 48
- 32
src/DotNetCore.CAP/Internal/IMessageSender.Default.cs 查看文件

@@ -25,7 +25,7 @@ namespace DotNetCore.CAP.Internal

// ReSharper disable once InconsistentNaming
protected static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName);

public MessageSender(
ILogger<MessageSender> logger,
@@ -61,32 +61,23 @@ namespace DotNetCore.CAP.Internal

private async Task<(bool, OperateResult)> SendWithoutRetryAsync(MediumMessage message)
{
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
var transportMsg = await _serializer.SerializeAsync(message.Origin);

var operationId = TracingBefore(message.Origin);
var tracingTimestamp = TracingBefore(transportMsg, _transport.Address);

var transportMsg = await _serializer.SerializeAsync(message.Origin);
var result = await _transport.SendAsync(transportMsg);

stopwatch.Stop();
if (result.Succeeded)
{
await SetSuccessfulState(message);

if (operationId != null)
{
TracingAfter(operationId.Value, message.Origin, startTime, stopwatch.Elapsed);
}
TracingAfter(tracingTimestamp, transportMsg, _transport.Address);

return (false, OperateResult.Success);
}
else
{
if (operationId != null)
{
TracingError(operationId.Value, message.Origin, result, startTime, stopwatch.Elapsed);
}
TracingError(tracingTimestamp, transportMsg, _transport.Address, result);

var needRetry = await SetFailedState(message, result.Exception);

@@ -144,42 +135,67 @@ namespace DotNetCore.CAP.Internal
return true;
}

private Guid? TracingBefore(Message message)
#region tracing

private long? TracingBefore(TransportMessage message, string broker)
{
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerExtensions.CapBeforePublish))
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.BeforePublish))
{
var operationId = Guid.NewGuid();

var eventData = new BrokerPublishEventData(operationId, "",_transport.Address, message,DateTimeOffset.UtcNow);
var eventData = new CapEventDataPubSend()
{
OperationTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
Operation = message.GetName(),
BrokerAddress = broker,
TransportMessage = message
};

s_diagnosticListener.Write(CapDiagnosticListenerExtensions.CapBeforePublish, eventData);
s_diagnosticListener.Write(CapDiagnosticListenerNames.BeforePublish, eventData);

return operationId;
return eventData.OperationTimestamp;
}

return null;
}

private void TracingAfter(Guid operationId, Message message, DateTimeOffset startTime, TimeSpan du)
private void TracingAfter(long? tracingTimestamp, TransportMessage message, string broker)
{
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerExtensions.CapAfterPublish))
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.AfterPublish))
{
var eventData = new BrokerPublishEndEventData(operationId, "", _transport.Address, message, startTime, du);

s_diagnosticListener.Write(CapDiagnosticListenerExtensions.CapAfterPublish, eventData);
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var eventData = new CapEventDataPubSend()
{
OperationTimestamp = now,
Operation = message.GetName(),
BrokerAddress = broker,
TransportMessage = message,
ElapsedTimeMs = now - tracingTimestamp.Value
};

s_diagnosticListener.Write(CapDiagnosticListenerNames.AfterPublish, eventData);
}
}

private void TracingError(Guid operationId, Message message, OperateResult result, DateTimeOffset startTime, TimeSpan du)
private void TracingError(long? tracingTimestamp, TransportMessage message, string broker, OperateResult result)
{
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerExtensions.CapAfterPublish))
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.ErrorPublish))
{
var ex = new PublisherSentFailedException(result.ToString(), result.Exception);
var eventData = new BrokerPublishErrorEventData(operationId, "", _transport.Address,
message, ex, startTime, du);
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();

s_diagnosticListener.Write(CapDiagnosticListenerExtensions.CapErrorPublish, eventData);
var eventData = new CapEventDataPubSend()
{
OperationTimestamp = now,
Operation = message.GetName(),
BrokerAddress = broker,
TransportMessage = message,
ElapsedTimeMs = now - tracingTimestamp.Value,
Exception = ex
};

s_diagnosticListener.Write(CapDiagnosticListenerNames.ErrorPublish, eventData);
}
}
}

#endregion
}
}

+ 68
- 10
src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs 查看文件

@@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
@@ -26,7 +27,7 @@ namespace DotNetCore.CAP.Internal
// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName);

public DefaultSubscriberExecutor(
ILogger<DefaultSubscriberExecutor> logger,
@@ -52,6 +53,8 @@ namespace DotNetCore.CAP.Internal
$"{Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63";
_logger.LogError(error);

TracingError(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), message.Origin, null, new Exception(error));

return Task.FromResult(OperateResult.Failed(new SubscriberNotFoundException(error)));
}

@@ -168,19 +171,13 @@ namespace DotNetCore.CAP.Internal

private async Task InvokeConsumerMethodAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken)
{
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
var operationId = Guid.Empty;

var consumerContext = new ConsumerContext(descriptor, message.Origin);
var tracingTimestamp = TracingBefore(message.Origin, descriptor.MethodInfo);
try
{
// operationId = s_diagnosticListener.WriteSubscriberInvokeBefore(consumerContext);

var ret = await Invoker.InvokeAsync(consumerContext, cancellationToken);

// s_diagnosticListener.WriteSubscriberInvokeAfter(operationId, consumerContext, startTime,stopwatch.Elapsed);
TracingAfter(tracingTimestamp, message.Origin, descriptor.MethodInfo);

if (!string.IsNullOrEmpty(ret.CallbackName))
{
@@ -199,10 +196,71 @@ namespace DotNetCore.CAP.Internal
}
catch (Exception ex)
{
// s_diagnosticListener.WriteSubscriberInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed);
TracingError(tracingTimestamp, message.Origin, descriptor.MethodInfo, ex);

throw new SubscriberExecutionFailedException(ex.Message, ex);
}
}

#region tracing

private long? TracingBefore(Message message, MethodInfo method)
{
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.BeforeSubscriberInvoke))
{
var eventData = new CapEventDataSubExecute()
{
OperationTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
Operation = message.GetName(),
Message = message,
MethodInfo = method
};

s_diagnosticListener.Write(CapDiagnosticListenerNames.BeforeSubscriberInvoke, eventData);

return eventData.OperationTimestamp;
}

return null;
}

private void TracingAfter(long? tracingTimestamp, Message message, MethodInfo method)
{
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.AfterSubscriberInvoke))
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var eventData = new CapEventDataSubExecute()
{
OperationTimestamp = now,
Operation = message.GetName(),
Message = message,
MethodInfo = method,
ElapsedTimeMs = now - tracingTimestamp.Value
};

s_diagnosticListener.Write(CapDiagnosticListenerNames.AfterSubscriberInvoke, eventData);
}
}

private void TracingError(long? tracingTimestamp, Message message, MethodInfo method, Exception ex)
{
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.ErrorSubscriberInvoke))
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var eventData = new CapEventDataSubExecute()
{
OperationTimestamp = now,
Operation = message.GetName(),
Message = message,
MethodInfo = method,
ElapsedTimeMs = now - tracingTimestamp.Value,
Exception = ex
};

s_diagnosticListener.Write(CapDiagnosticListenerNames.ErrorSubscriberInvoke, eventData);
}
}

#endregion
}
}

Loading…
取消
儲存