Browse Source

Add support for Diganostics

master
Savorboard 6 years ago
parent
commit
79e08d78cd
9 changed files with 133 additions and 58 deletions
  1. +2
    -16
      src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs
  2. +2
    -0
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
  3. +1
    -13
      src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs
  4. +2
    -0
      src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
  5. +3
    -3
      src/DotNetCore.CAP/Diagnostics/DiagnosticListenerExtensions.cs
  6. +2
    -0
      src/DotNetCore.CAP/IConsumerClient.cs
  7. +56
    -19
      src/DotNetCore.CAP/IConsumerHandler.Default.cs
  8. +62
    -4
      src/DotNetCore.CAP/IPublishMessageSender.Base.cs
  9. +3
    -3
      src/DotNetCore.CAP/ISubscribeExecutor.Default.cs

+ 2
- 16
src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs View File

@@ -2,10 +2,8 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.


using System; using System;
using System.Diagnostics;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States; using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@@ -16,7 +14,6 @@ namespace DotNetCore.CAP.Kafka
{ {
private readonly IConnectionPool _connectionPool; private readonly IConnectionPool _connectionPool;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly string _serversAddress;


public KafkaPublishMessageSender( public KafkaPublishMessageSender(
CapOptions options, IStateChanger stateChanger, IStorageConnection connection, CapOptions options, IStateChanger stateChanger, IStorageConnection connection,
@@ -25,21 +22,15 @@ namespace DotNetCore.CAP.Kafka
{ {
_logger = logger; _logger = logger;
_connectionPool = connectionPool; _connectionPool = connectionPool;
_serversAddress = _connectionPool.ServersAddress;
ServersAddress = _connectionPool.ServersAddress;
} }


public override async Task<OperateResult> PublishAsync(string keyName, string content) public override async Task<OperateResult> PublishAsync(string keyName, string content)
{ {
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
Guid operationId = Guid.Empty;

var producer = _connectionPool.Rent(); var producer = _connectionPool.Rent();


try try
{ {
operationId = s_diagnosticListener.WritePublishBefore(keyName, content, _serversAddress);

var contentBytes = Encoding.UTF8.GetBytes(content); var contentBytes = Encoding.UTF8.GetBytes(content);
var message = await producer.ProduceAsync(keyName, null, contentBytes); var message = await producer.ProduceAsync(keyName, null, contentBytes);


@@ -52,16 +43,12 @@ namespace DotNetCore.CAP.Kafka
}); });
} }


s_diagnosticListener.WritePublishAfter(operationId, message.Topic, content, _serversAddress, startTime, stopwatch.Elapsed);

_logger.LogDebug($"kafka topic message [{keyName}] has been published."); _logger.LogDebug($"kafka topic message [{keyName}] has been published.");


return OperateResult.Success; return OperateResult.Success;
} }
catch (Exception ex) catch (Exception ex)
{ {
s_diagnosticListener.WritePublishError(operationId, keyName, content, _serversAddress, ex, startTime, stopwatch.Elapsed);

var wapperEx = new PublisherSentFailedException(ex.Message, ex); var wapperEx = new PublisherSentFailedException(ex.Message, ex);


return OperateResult.Failed(wapperEx); return OperateResult.Failed(wapperEx);
@@ -74,7 +61,6 @@ namespace DotNetCore.CAP.Kafka
producer.Dispose(); producer.Dispose();
} }
} }
}

}
} }
} }

+ 2
- 0
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs View File

@@ -29,6 +29,8 @@ namespace DotNetCore.CAP.Kafka


public event EventHandler<LogMessageEventArgs> OnLog; public event EventHandler<LogMessageEventArgs> OnLog;


public string ServersAddress => _kafkaOptions.Servers;

public void Subscribe(IEnumerable<string> topics) public void Subscribe(IEnumerable<string> topics)
{ {
if (topics == null) if (topics == null)


+ 1
- 13
src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs View File

@@ -2,10 +2,8 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.


using System; using System;
using System.Diagnostics;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States; using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@@ -18,7 +16,6 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly IConnectionChannelPool _connectionChannelPool; private readonly IConnectionChannelPool _connectionChannelPool;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly string _exchange; private readonly string _exchange;
private readonly string _hostAddress;


public RabbitMQPublishMessageSender(ILogger<RabbitMQPublishMessageSender> logger, CapOptions options, public RabbitMQPublishMessageSender(ILogger<RabbitMQPublishMessageSender> logger, CapOptions options,
IStorageConnection connection, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger) IStorageConnection connection, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger)
@@ -27,33 +24,24 @@ namespace DotNetCore.CAP.RabbitMQ
_logger = logger; _logger = logger;
_connectionChannelPool = connectionChannelPool; _connectionChannelPool = connectionChannelPool;
_exchange = _connectionChannelPool.Exchange; _exchange = _connectionChannelPool.Exchange;
_hostAddress = _connectionChannelPool.HostAddress;
ServersAddress = _connectionChannelPool.HostAddress;
} }


public override Task<OperateResult> PublishAsync(string keyName, string content) public override Task<OperateResult> PublishAsync(string keyName, string content)
{ {
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
Guid operationId = Guid.Empty;

var channel = _connectionChannelPool.Rent(); var channel = _connectionChannelPool.Rent();
try try
{ {
operationId = s_diagnosticListener.WritePublishBefore(keyName, content, _hostAddress);

var body = Encoding.UTF8.GetBytes(content); var body = Encoding.UTF8.GetBytes(content);
channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true); channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_exchange, keyName, null, body); channel.BasicPublish(_exchange, keyName, null, body);


s_diagnosticListener.WritePublishAfter(operationId, keyName, content, _hostAddress, startTime, stopwatch.Elapsed);
_logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published."); _logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published.");


return Task.FromResult(OperateResult.Success); return Task.FromResult(OperateResult.Success);
} }
catch (Exception ex) catch (Exception ex)
{ {
s_diagnosticListener.WritePublishError(operationId, keyName, content, _hostAddress, ex, startTime, stopwatch.Elapsed);

var wapperEx = new PublisherSentFailedException(ex.Message, ex); var wapperEx = new PublisherSentFailedException(ex.Message, ex);
var errors = new OperateError var errors = new OperateError
{ {


+ 2
- 0
src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs View File

@@ -37,6 +37,8 @@ namespace DotNetCore.CAP.RabbitMQ


public event EventHandler<LogMessageEventArgs> OnLog; public event EventHandler<LogMessageEventArgs> OnLog;


public string ServersAddress => _rabbitMQOptions.HostName;

public void Subscribe(IEnumerable<string> topics) public void Subscribe(IEnumerable<string> topics)
{ {
if (topics == null) if (topics == null)


+ 3
- 3
src/DotNetCore.CAP/Diagnostics/DiagnosticListenerExtensions.cs View File

@@ -174,7 +174,7 @@ namespace DotNetCore.CAP.Diagnostics
var subscribeGroup = context.ConsumerDescriptor.Attribute.Group; var subscribeGroup = context.ConsumerDescriptor.Attribute.Group;
var parameterValues = context.DeliverMessage.Content; var parameterValues = context.DeliverMessage.Content;


@this.Write(CapBeforePublish, new SubscriberInvokeEventData(operationId, operation, methodName,
@this.Write(CapBeforeSubscriberInvoke, new SubscriberInvokeEventData(operationId, operation, methodName,
subscribeName, subscribeName,
subscribeGroup, parameterValues, DateTimeOffset.UtcNow)); subscribeGroup, parameterValues, DateTimeOffset.UtcNow));


@@ -198,7 +198,7 @@ namespace DotNetCore.CAP.Diagnostics
var subscribeGroup = context.ConsumerDescriptor.Attribute.Group; var subscribeGroup = context.ConsumerDescriptor.Attribute.Group;
var parameterValues = context.DeliverMessage.Content; var parameterValues = context.DeliverMessage.Content;


@this.Write(CapBeforePublish, new SubscriberInvokeEndEventData(operationId, operation, methodName,
@this.Write(CapAfterSubscriberInvoke, new SubscriberInvokeEndEventData(operationId, operation, methodName,
subscribeName, subscribeName,
subscribeGroup, parameterValues, startTime, duration)); subscribeGroup, parameterValues, startTime, duration));
} }
@@ -219,7 +219,7 @@ namespace DotNetCore.CAP.Diagnostics
var subscribeGroup = context.ConsumerDescriptor.Attribute.Group; var subscribeGroup = context.ConsumerDescriptor.Attribute.Group;
var parameterValues = context.DeliverMessage.Content; var parameterValues = context.DeliverMessage.Content;


@this.Write(CapBeforePublish, new SubscriberInvokeErrorEventData(operationId, operation, methodName,
@this.Write(CapErrorSubscriberInvoke, new SubscriberInvokeErrorEventData(operationId, operation, methodName,
subscribeName, subscribeName,
subscribeGroup, parameterValues, ex, startTime, duration)); subscribeGroup, parameterValues, ex, startTime, duration));
} }


+ 2
- 0
src/DotNetCore.CAP/IConsumerClient.cs View File

@@ -13,6 +13,8 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public interface IConsumerClient : IDisposable public interface IConsumerClient : IDisposable
{ {
string ServersAddress { get; }

/// <summary> /// <summary>
/// Subscribe to a set of topics to the message queue /// Subscribe to a set of topics to the message queue
/// </summary> /// </summary>


+ 56
- 19
src/DotNetCore.CAP/IConsumerHandler.Default.cs View File

@@ -24,6 +24,7 @@ namespace DotNetCore.CAP
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1);
private readonly MethodMatcherCache _selector; private readonly MethodMatcherCache _selector;


private string _serverAddress;
private Task _compositeTask; private Task _compositeTask;
private bool _disposed; private bool _disposed;


@@ -56,6 +57,8 @@ namespace DotNetCore.CAP
{ {
using (var client = _consumerClientFactory.Create(matchGroup.Key)) using (var client = _consumerClientFactory.Create(matchGroup.Key))
{ {
_serverAddress = client.ServersAddress;

RegisterMessageProcessor(client); RegisterMessageProcessor(client);


client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name)); client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));
@@ -102,30 +105,24 @@ namespace DotNetCore.CAP
{ {
var startTime = DateTimeOffset.UtcNow; var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew(); var stopwatch = Stopwatch.StartNew();
var operationId = Guid.Empty;

var tracingResult = TracingBefore(messageContext.Name, messageContext.Content);
var operationId = tracingResult.Item1;
var messageBody = tracingResult.Item2;


var receivedMessage = new CapReceivedMessage(messageContext) var receivedMessage = new CapReceivedMessage(messageContext)
{ {
StatusName = StatusName.Scheduled
StatusName = StatusName.Scheduled,
Content = messageBody
}; };


try try
{ {
operationId = s_diagnosticListener.WriteReceiveMessageStoreBefore(
messageContext.Name,
messageContext.Content,
messageContext.Group);

StoreMessage(receivedMessage); StoreMessage(receivedMessage);


client.Commit(); client.Commit();


s_diagnosticListener.WriteReceiveMessageStoreAfter(
operationId,
messageContext.Name,
messageContext.Content,
messageContext.Group,
startTime,
TracingAfter(operationId, receivedMessage.Name, receivedMessage.Content, startTime,
stopwatch.Elapsed); stopwatch.Elapsed);


_dispatcher.EnqueueToExecute(receivedMessage); _dispatcher.EnqueueToExecute(receivedMessage);
@@ -136,12 +133,7 @@ namespace DotNetCore.CAP


client.Reject(); client.Reject();


s_diagnosticListener.WriteReceiveMessageStoreError(operationId,
messageContext.Name,
messageContext.Content,
messageContext.Group,
e,
startTime,
TracingError(operationId, receivedMessage.Name, receivedMessage.Content, e, startTime,
stopwatch.Elapsed); stopwatch.Elapsed);
} }
}; };
@@ -183,5 +175,50 @@ namespace DotNetCore.CAP


receivedMessage.Id = id; receivedMessage.Id = id;
} }

private (Guid, string) TracingBefore(string topic, string values)
{
Guid operationId = Guid.NewGuid();

var eventData = new BrokerConsumeEventData(
operationId, "",
_serverAddress,
topic,
values,
DateTimeOffset.UtcNow);

s_diagnosticListener.WriteConsumeBefore(eventData);

return (operationId, eventData.BrokerTopicBody);
}

private void TracingAfter(Guid operationId, string topic, string values, DateTimeOffset startTime, TimeSpan du)
{
var eventData = new BrokerConsumeEndEventData(
operationId,
"",
_serverAddress,
topic,
values,
startTime,
du);

s_diagnosticListener.WriteConsumeAfter(eventData);
}

private void TracingError(Guid operationId, string topic, string values, Exception ex, DateTimeOffset startTime, TimeSpan du)
{
var eventData = new BrokerConsumeErrorEventData(
operationId,
"",
_serverAddress,
topic,
values,
ex,
startTime,
du);

s_diagnosticListener.WriteConsumeError(eventData);
}
} }
} }

+ 62
- 4
src/DotNetCore.CAP/IPublishMessageSender.Base.cs View File

@@ -21,6 +21,8 @@ namespace DotNetCore.CAP
private readonly CapOptions _options; private readonly CapOptions _options;
private readonly IStateChanger _stateChanger; private readonly IStateChanger _stateChanger;


protected string ServersAddress { get; set; }

// diagnostics listener // diagnostics listener
// ReSharper disable once InconsistentNaming // ReSharper disable once InconsistentNaming
protected static readonly DiagnosticListener s_diagnosticListener = protected static readonly DiagnosticListener s_diagnosticListener =
@@ -42,25 +44,35 @@ namespace DotNetCore.CAP


public async Task<OperateResult> SendAsync(CapPublishedMessage message) public async Task<OperateResult> SendAsync(CapPublishedMessage message)
{ {
var sp = Stopwatch.StartNew();
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();

var tracingResult = TracingBefore(message.Name, message.Content);
var operationId = tracingResult.Item1;


var result = await PublishAsync(message.Name, message.Content);
var sendValues = tracingResult.Item2 != null
? Helper.AddTracingHeaderProperty(message.Content, tracingResult.Item2)
: message.Content;


sp.Stop();
var result = await PublishAsync(message.Name, sendValues);


stopwatch.Stop();
if (result.Succeeded) if (result.Succeeded)
{ {
await SetSuccessfulState(message); await SetSuccessfulState(message);


_logger.MessageHasBeenSent(sp.Elapsed.TotalSeconds);
TracingAfter(operationId, message.Name, sendValues, startTime, stopwatch.Elapsed);


return OperateResult.Success; return OperateResult.Success;
} }
else else
{ {
TracingError(operationId, message.Name, sendValues, result.Exception, startTime, stopwatch.Elapsed);

_logger.MessagePublishException(message.Id, result.Exception); _logger.MessagePublishException(message.Id, result.Exception);


await SetFailedState(message, result.Exception, out bool stillRetry); await SetFailedState(message, result.Exception, out bool stillRetry);

if (stillRetry) if (stillRetry)
{ {
_logger.SenderRetrying(3); _logger.SenderRetrying(3);
@@ -122,5 +134,51 @@ namespace DotNetCore.CAP
{ {
message.Content = Helper.AddExceptionProperty(message.Content, exception); message.Content = Helper.AddExceptionProperty(message.Content, exception);
} }

private (Guid, TracingHeaders) TracingBefore(string topic, string values)
{
Guid operationId = Guid.NewGuid();

var eventData = new BrokerPublishEventData(
operationId, "",
ServersAddress, topic,
values,
DateTimeOffset.UtcNow);

s_diagnosticListener.WritePublishBefore(eventData);

return (operationId, eventData.Headers);
}

private void TracingAfter(Guid operationId, string topic, string values, DateTimeOffset startTime, TimeSpan du)
{
var eventData = new BrokerPublishEndEventData(
operationId,
"",
ServersAddress,
topic,
values,
startTime,
du);

s_diagnosticListener.WritePublishAfter(eventData);

_logger.MessageHasBeenSent(du.TotalSeconds);
}

private void TracingError(Guid operationId, string topic, string values, Exception ex, DateTimeOffset startTime, TimeSpan du)
{
var eventData = new BrokerPublishErrorEventData(
operationId,
"",
ServersAddress,
topic,
values,
ex,
startTime,
du);

s_diagnosticListener.WritePublishError(eventData);
}
} }
} }

+ 3
- 3
src/DotNetCore.CAP/ISubscribeExecutor.Default.cs View File

@@ -153,11 +153,11 @@ namespace DotNetCore.CAP


try try
{ {
operationId = s_diagnosticListener.WriteConsumerInvokeBefore(consumerContext);
operationId = s_diagnosticListener.WriteSubscriberInvokeBefore(consumerContext);


var ret = await Invoker.InvokeAsync(consumerContext); var ret = await Invoker.InvokeAsync(consumerContext);


s_diagnosticListener.WriteConsumerInvokeAfter(operationId, consumerContext, startTime, stopwatch.Elapsed);
s_diagnosticListener.WriteSubscriberInvokeAfter(operationId, consumerContext, startTime, stopwatch.Elapsed);


if (!string.IsNullOrEmpty(ret.CallbackName)) if (!string.IsNullOrEmpty(ret.CallbackName))
{ {
@@ -166,7 +166,7 @@ namespace DotNetCore.CAP
} }
catch (Exception ex) catch (Exception ex)
{ {
s_diagnosticListener.WriteConsumerInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed);
s_diagnosticListener.WriteSubscriberInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed);


throw new SubscriberExecutionFailedException(ex.Message, ex); throw new SubscriberExecutionFailedException(ex.Message, ex);
} }


Loading…
Cancel
Save