@@ -2,10 +2,8 @@ | |||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||
using System; | |||
using System.Diagnostics; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Diagnostics; | |||
using DotNetCore.CAP.Internal; | |||
using DotNetCore.CAP.Processor.States; | |||
using Microsoft.Extensions.Logging; | |||
@@ -16,7 +14,6 @@ namespace DotNetCore.CAP.Kafka | |||
{ | |||
private readonly IConnectionPool _connectionPool; | |||
private readonly ILogger _logger; | |||
private readonly string _serversAddress; | |||
public KafkaPublishMessageSender( | |||
CapOptions options, IStateChanger stateChanger, IStorageConnection connection, | |||
@@ -25,21 +22,15 @@ namespace DotNetCore.CAP.Kafka | |||
{ | |||
_logger = logger; | |||
_connectionPool = connectionPool; | |||
_serversAddress = _connectionPool.ServersAddress; | |||
ServersAddress = _connectionPool.ServersAddress; | |||
} | |||
public override async Task<OperateResult> PublishAsync(string keyName, string content) | |||
{ | |||
var startTime = DateTimeOffset.UtcNow; | |||
var stopwatch = Stopwatch.StartNew(); | |||
Guid operationId = Guid.Empty; | |||
var producer = _connectionPool.Rent(); | |||
try | |||
{ | |||
operationId = s_diagnosticListener.WritePublishBefore(keyName, content, _serversAddress); | |||
var contentBytes = Encoding.UTF8.GetBytes(content); | |||
var message = await producer.ProduceAsync(keyName, null, contentBytes); | |||
@@ -52,16 +43,12 @@ namespace DotNetCore.CAP.Kafka | |||
}); | |||
} | |||
s_diagnosticListener.WritePublishAfter(operationId, message.Topic, content, _serversAddress, startTime, stopwatch.Elapsed); | |||
_logger.LogDebug($"kafka topic message [{keyName}] has been published."); | |||
return OperateResult.Success; | |||
} | |||
catch (Exception ex) | |||
{ | |||
s_diagnosticListener.WritePublishError(operationId, keyName, content, _serversAddress, ex, startTime, stopwatch.Elapsed); | |||
var wapperEx = new PublisherSentFailedException(ex.Message, ex); | |||
return OperateResult.Failed(wapperEx); | |||
@@ -74,7 +61,6 @@ namespace DotNetCore.CAP.Kafka | |||
producer.Dispose(); | |||
} | |||
} | |||
} | |||
} | |||
} | |||
} |
@@ -29,6 +29,8 @@ namespace DotNetCore.CAP.Kafka | |||
public event EventHandler<LogMessageEventArgs> OnLog; | |||
public string ServersAddress => _kafkaOptions.Servers; | |||
public void Subscribe(IEnumerable<string> topics) | |||
{ | |||
if (topics == null) | |||
@@ -2,10 +2,8 @@ | |||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||
using System; | |||
using System.Diagnostics; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Diagnostics; | |||
using DotNetCore.CAP.Internal; | |||
using DotNetCore.CAP.Processor.States; | |||
using Microsoft.Extensions.Logging; | |||
@@ -18,7 +16,6 @@ namespace DotNetCore.CAP.RabbitMQ | |||
private readonly IConnectionChannelPool _connectionChannelPool; | |||
private readonly ILogger _logger; | |||
private readonly string _exchange; | |||
private readonly string _hostAddress; | |||
public RabbitMQPublishMessageSender(ILogger<RabbitMQPublishMessageSender> logger, CapOptions options, | |||
IStorageConnection connection, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger) | |||
@@ -27,33 +24,24 @@ namespace DotNetCore.CAP.RabbitMQ | |||
_logger = logger; | |||
_connectionChannelPool = connectionChannelPool; | |||
_exchange = _connectionChannelPool.Exchange; | |||
_hostAddress = _connectionChannelPool.HostAddress; | |||
ServersAddress = _connectionChannelPool.HostAddress; | |||
} | |||
public override Task<OperateResult> PublishAsync(string keyName, string content) | |||
{ | |||
var startTime = DateTimeOffset.UtcNow; | |||
var stopwatch = Stopwatch.StartNew(); | |||
Guid operationId = Guid.Empty; | |||
var channel = _connectionChannelPool.Rent(); | |||
try | |||
{ | |||
operationId = s_diagnosticListener.WritePublishBefore(keyName, content, _hostAddress); | |||
var body = Encoding.UTF8.GetBytes(content); | |||
channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true); | |||
channel.BasicPublish(_exchange, keyName, null, body); | |||
s_diagnosticListener.WritePublishAfter(operationId, keyName, content, _hostAddress, startTime, stopwatch.Elapsed); | |||
_logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published."); | |||
return Task.FromResult(OperateResult.Success); | |||
} | |||
catch (Exception ex) | |||
{ | |||
s_diagnosticListener.WritePublishError(operationId, keyName, content, _hostAddress, ex, startTime, stopwatch.Elapsed); | |||
var wapperEx = new PublisherSentFailedException(ex.Message, ex); | |||
var errors = new OperateError | |||
{ | |||
@@ -37,6 +37,8 @@ namespace DotNetCore.CAP.RabbitMQ | |||
public event EventHandler<LogMessageEventArgs> OnLog; | |||
public string ServersAddress => _rabbitMQOptions.HostName; | |||
public void Subscribe(IEnumerable<string> topics) | |||
{ | |||
if (topics == null) | |||
@@ -174,7 +174,7 @@ namespace DotNetCore.CAP.Diagnostics | |||
var subscribeGroup = context.ConsumerDescriptor.Attribute.Group; | |||
var parameterValues = context.DeliverMessage.Content; | |||
@this.Write(CapBeforePublish, new SubscriberInvokeEventData(operationId, operation, methodName, | |||
@this.Write(CapBeforeSubscriberInvoke, new SubscriberInvokeEventData(operationId, operation, methodName, | |||
subscribeName, | |||
subscribeGroup, parameterValues, DateTimeOffset.UtcNow)); | |||
@@ -198,7 +198,7 @@ namespace DotNetCore.CAP.Diagnostics | |||
var subscribeGroup = context.ConsumerDescriptor.Attribute.Group; | |||
var parameterValues = context.DeliverMessage.Content; | |||
@this.Write(CapBeforePublish, new SubscriberInvokeEndEventData(operationId, operation, methodName, | |||
@this.Write(CapAfterSubscriberInvoke, new SubscriberInvokeEndEventData(operationId, operation, methodName, | |||
subscribeName, | |||
subscribeGroup, parameterValues, startTime, duration)); | |||
} | |||
@@ -219,7 +219,7 @@ namespace DotNetCore.CAP.Diagnostics | |||
var subscribeGroup = context.ConsumerDescriptor.Attribute.Group; | |||
var parameterValues = context.DeliverMessage.Content; | |||
@this.Write(CapBeforePublish, new SubscriberInvokeErrorEventData(operationId, operation, methodName, | |||
@this.Write(CapErrorSubscriberInvoke, new SubscriberInvokeErrorEventData(operationId, operation, methodName, | |||
subscribeName, | |||
subscribeGroup, parameterValues, ex, startTime, duration)); | |||
} | |||
@@ -13,6 +13,8 @@ namespace DotNetCore.CAP | |||
/// </summary> | |||
public interface IConsumerClient : IDisposable | |||
{ | |||
string ServersAddress { get; } | |||
/// <summary> | |||
/// Subscribe to a set of topics to the message queue | |||
/// </summary> | |||
@@ -24,6 +24,7 @@ namespace DotNetCore.CAP | |||
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); | |||
private readonly MethodMatcherCache _selector; | |||
private string _serverAddress; | |||
private Task _compositeTask; | |||
private bool _disposed; | |||
@@ -56,6 +57,8 @@ namespace DotNetCore.CAP | |||
{ | |||
using (var client = _consumerClientFactory.Create(matchGroup.Key)) | |||
{ | |||
_serverAddress = client.ServersAddress; | |||
RegisterMessageProcessor(client); | |||
client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name)); | |||
@@ -102,30 +105,24 @@ namespace DotNetCore.CAP | |||
{ | |||
var startTime = DateTimeOffset.UtcNow; | |||
var stopwatch = Stopwatch.StartNew(); | |||
var operationId = Guid.Empty; | |||
var tracingResult = TracingBefore(messageContext.Name, messageContext.Content); | |||
var operationId = tracingResult.Item1; | |||
var messageBody = tracingResult.Item2; | |||
var receivedMessage = new CapReceivedMessage(messageContext) | |||
{ | |||
StatusName = StatusName.Scheduled | |||
StatusName = StatusName.Scheduled, | |||
Content = messageBody | |||
}; | |||
try | |||
{ | |||
operationId = s_diagnosticListener.WriteReceiveMessageStoreBefore( | |||
messageContext.Name, | |||
messageContext.Content, | |||
messageContext.Group); | |||
StoreMessage(receivedMessage); | |||
client.Commit(); | |||
s_diagnosticListener.WriteReceiveMessageStoreAfter( | |||
operationId, | |||
messageContext.Name, | |||
messageContext.Content, | |||
messageContext.Group, | |||
startTime, | |||
TracingAfter(operationId, receivedMessage.Name, receivedMessage.Content, startTime, | |||
stopwatch.Elapsed); | |||
_dispatcher.EnqueueToExecute(receivedMessage); | |||
@@ -136,12 +133,7 @@ namespace DotNetCore.CAP | |||
client.Reject(); | |||
s_diagnosticListener.WriteReceiveMessageStoreError(operationId, | |||
messageContext.Name, | |||
messageContext.Content, | |||
messageContext.Group, | |||
e, | |||
startTime, | |||
TracingError(operationId, receivedMessage.Name, receivedMessage.Content, e, startTime, | |||
stopwatch.Elapsed); | |||
} | |||
}; | |||
@@ -183,5 +175,50 @@ namespace DotNetCore.CAP | |||
receivedMessage.Id = id; | |||
} | |||
private (Guid, string) TracingBefore(string topic, string values) | |||
{ | |||
Guid operationId = Guid.NewGuid(); | |||
var eventData = new BrokerConsumeEventData( | |||
operationId, "", | |||
_serverAddress, | |||
topic, | |||
values, | |||
DateTimeOffset.UtcNow); | |||
s_diagnosticListener.WriteConsumeBefore(eventData); | |||
return (operationId, eventData.BrokerTopicBody); | |||
} | |||
private void TracingAfter(Guid operationId, string topic, string values, DateTimeOffset startTime, TimeSpan du) | |||
{ | |||
var eventData = new BrokerConsumeEndEventData( | |||
operationId, | |||
"", | |||
_serverAddress, | |||
topic, | |||
values, | |||
startTime, | |||
du); | |||
s_diagnosticListener.WriteConsumeAfter(eventData); | |||
} | |||
private void TracingError(Guid operationId, string topic, string values, Exception ex, DateTimeOffset startTime, TimeSpan du) | |||
{ | |||
var eventData = new BrokerConsumeErrorEventData( | |||
operationId, | |||
"", | |||
_serverAddress, | |||
topic, | |||
values, | |||
ex, | |||
startTime, | |||
du); | |||
s_diagnosticListener.WriteConsumeError(eventData); | |||
} | |||
} | |||
} |
@@ -21,6 +21,8 @@ namespace DotNetCore.CAP | |||
private readonly CapOptions _options; | |||
private readonly IStateChanger _stateChanger; | |||
protected string ServersAddress { get; set; } | |||
// diagnostics listener | |||
// ReSharper disable once InconsistentNaming | |||
protected static readonly DiagnosticListener s_diagnosticListener = | |||
@@ -42,25 +44,35 @@ namespace DotNetCore.CAP | |||
public async Task<OperateResult> SendAsync(CapPublishedMessage message) | |||
{ | |||
var sp = Stopwatch.StartNew(); | |||
var startTime = DateTimeOffset.UtcNow; | |||
var stopwatch = Stopwatch.StartNew(); | |||
var tracingResult = TracingBefore(message.Name, message.Content); | |||
var operationId = tracingResult.Item1; | |||
var result = await PublishAsync(message.Name, message.Content); | |||
var sendValues = tracingResult.Item2 != null | |||
? Helper.AddTracingHeaderProperty(message.Content, tracingResult.Item2) | |||
: message.Content; | |||
sp.Stop(); | |||
var result = await PublishAsync(message.Name, sendValues); | |||
stopwatch.Stop(); | |||
if (result.Succeeded) | |||
{ | |||
await SetSuccessfulState(message); | |||
_logger.MessageHasBeenSent(sp.Elapsed.TotalSeconds); | |||
TracingAfter(operationId, message.Name, sendValues, startTime, stopwatch.Elapsed); | |||
return OperateResult.Success; | |||
} | |||
else | |||
{ | |||
TracingError(operationId, message.Name, sendValues, result.Exception, startTime, stopwatch.Elapsed); | |||
_logger.MessagePublishException(message.Id, result.Exception); | |||
await SetFailedState(message, result.Exception, out bool stillRetry); | |||
if (stillRetry) | |||
{ | |||
_logger.SenderRetrying(3); | |||
@@ -122,5 +134,51 @@ namespace DotNetCore.CAP | |||
{ | |||
message.Content = Helper.AddExceptionProperty(message.Content, exception); | |||
} | |||
private (Guid, TracingHeaders) TracingBefore(string topic, string values) | |||
{ | |||
Guid operationId = Guid.NewGuid(); | |||
var eventData = new BrokerPublishEventData( | |||
operationId, "", | |||
ServersAddress, topic, | |||
values, | |||
DateTimeOffset.UtcNow); | |||
s_diagnosticListener.WritePublishBefore(eventData); | |||
return (operationId, eventData.Headers); | |||
} | |||
private void TracingAfter(Guid operationId, string topic, string values, DateTimeOffset startTime, TimeSpan du) | |||
{ | |||
var eventData = new BrokerPublishEndEventData( | |||
operationId, | |||
"", | |||
ServersAddress, | |||
topic, | |||
values, | |||
startTime, | |||
du); | |||
s_diagnosticListener.WritePublishAfter(eventData); | |||
_logger.MessageHasBeenSent(du.TotalSeconds); | |||
} | |||
private void TracingError(Guid operationId, string topic, string values, Exception ex, DateTimeOffset startTime, TimeSpan du) | |||
{ | |||
var eventData = new BrokerPublishErrorEventData( | |||
operationId, | |||
"", | |||
ServersAddress, | |||
topic, | |||
values, | |||
ex, | |||
startTime, | |||
du); | |||
s_diagnosticListener.WritePublishError(eventData); | |||
} | |||
} | |||
} |
@@ -153,11 +153,11 @@ namespace DotNetCore.CAP | |||
try | |||
{ | |||
operationId = s_diagnosticListener.WriteConsumerInvokeBefore(consumerContext); | |||
operationId = s_diagnosticListener.WriteSubscriberInvokeBefore(consumerContext); | |||
var ret = await Invoker.InvokeAsync(consumerContext); | |||
s_diagnosticListener.WriteConsumerInvokeAfter(operationId, consumerContext, startTime, stopwatch.Elapsed); | |||
s_diagnosticListener.WriteSubscriberInvokeAfter(operationId, consumerContext, startTime, stopwatch.Elapsed); | |||
if (!string.IsNullOrEmpty(ret.CallbackName)) | |||
{ | |||
@@ -166,7 +166,7 @@ namespace DotNetCore.CAP | |||
} | |||
catch (Exception ex) | |||
{ | |||
s_diagnosticListener.WriteConsumerInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed); | |||
s_diagnosticListener.WriteSubscriberInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed); | |||
throw new SubscriberExecutionFailedException(ex.Message, ex); | |||
} | |||