From 0603833cf88ea04452eb9e077570f30dcca99da3 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Sat, 18 Apr 2020 22:23:07 +0800 Subject: [PATCH] Refactored FailedThresholdCallback parameters to support getting the ServiceProvider. #522 --- samples/Sample.Kafka.InMemory/Startup.cs | 2 +- samples/Sample.RabbitMQ.MySql/Startup.cs | 11 +++-- samples/Sample.RabbitMQ.SqlServer/Startup.cs | 8 ++-- .../ITransport.RabbitMQ.cs | 14 ++++-- src/DotNetCore.CAP/CAP.Options.cs | 2 +- .../Internal/IConsumerRegister.Default.cs | 47 ++++++++++++------- .../Internal/IMessageSender.Default.cs | 27 +++++++---- .../Internal/ISubscribeDispatcher.Default.cs | 7 ++- src/DotNetCore.CAP/Messages/FailedInfo.cs | 13 +++++ 9 files changed, 89 insertions(+), 42 deletions(-) create mode 100644 src/DotNetCore.CAP/Messages/FailedInfo.cs diff --git a/samples/Sample.Kafka.InMemory/Startup.cs b/samples/Sample.Kafka.InMemory/Startup.cs index 281ef24..ae3ee2a 100644 --- a/samples/Sample.Kafka.InMemory/Startup.cs +++ b/samples/Sample.Kafka.InMemory/Startup.cs @@ -10,7 +10,7 @@ namespace Sample.Kafka.InMemory services.AddCap(x => { x.UseInMemoryStorage(); - x.UseKafka("192.168.2.120:9093"); + x.UseKafka("localhost:9092"); x.UseDashboard(); }); diff --git a/samples/Sample.RabbitMQ.MySql/Startup.cs b/samples/Sample.RabbitMQ.MySql/Startup.cs index fdcbe23..6b31ef5 100644 --- a/samples/Sample.RabbitMQ.MySql/Startup.cs +++ b/samples/Sample.RabbitMQ.MySql/Startup.cs @@ -1,7 +1,7 @@ -using System; -using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Messages; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; namespace Sample.RabbitMQ.MySql { @@ -17,10 +17,11 @@ namespace Sample.RabbitMQ.MySql x.UseRabbitMQ("localhost"); x.UseDashboard(); x.FailedRetryCount = 5; - x.FailedThresholdCallback = (type, msg) => + x.FailedThresholdCallback = failed => { - Console.WriteLine( - $@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {msg.GetName()}"); + var logger = failed.ServiceProvider.GetService>(); + logger.LogError($@"A message of type {failed.MessageType} failed after executing {x.FailedRetryCount} several times, + requiring manual troubleshooting. Message name: {failed.Message.GetName()}"); }; }); diff --git a/samples/Sample.RabbitMQ.SqlServer/Startup.cs b/samples/Sample.RabbitMQ.SqlServer/Startup.cs index 38b91f7..a309637 100644 --- a/samples/Sample.RabbitMQ.SqlServer/Startup.cs +++ b/samples/Sample.RabbitMQ.SqlServer/Startup.cs @@ -2,6 +2,7 @@ using DotNetCore.CAP.Messages; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; namespace Sample.RabbitMQ.SqlServer { @@ -17,10 +18,11 @@ namespace Sample.RabbitMQ.SqlServer x.UseRabbitMQ("192.168.2.120"); x.UseDashboard(); x.FailedRetryCount = 5; - x.FailedThresholdCallback = (type, msg) => + x.FailedThresholdCallback = failed => { - Console.WriteLine( - $@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {msg.GetName()}"); + var logger = failed.ServiceProvider.GetService>(); + logger.LogError($@"A message of type {failed.MessageType} failed after executing {x.FailedRetryCount} several times, + requiring manual troubleshooting. Message name: {failed.Message.GetName()}"); }; }); diff --git a/src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs b/src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs index 39c321e..d43bdbd 100644 --- a/src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs +++ b/src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs @@ -32,9 +32,10 @@ namespace DotNetCore.CAP.RabbitMQ public Task SendAsync(TransportMessage message) { - var channel = _connectionChannelPool.Rent(); + IModel channel = null; try { + channel = _connectionChannelPool.Rent(); var props = new BasicProperties { DeliveryMode = 2, @@ -62,12 +63,15 @@ namespace DotNetCore.CAP.RabbitMQ } finally { - var returned = _connectionChannelPool.Return(channel); - if (!returned) + if (channel != null) { - channel.Dispose(); + var returned = _connectionChannelPool.Return(channel); + if (!returned) + { + channel.Dispose(); + } } } - } + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index ea178d1..ebdbf91 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -53,7 +53,7 @@ namespace DotNetCore.CAP /// /// We’ll invoke this call-back with message type,name,content when retry failed (send or executed) messages equals times. /// - public Action FailedThresholdCallback { get; set; } + public Action FailedThresholdCallback { get; set; } /// /// The number of message retries, the retry will stop when the threshold is reached. diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index 56b4c50..980b70a 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -12,6 +12,7 @@ using DotNetCore.CAP.Messages; using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Serialization; using DotNetCore.CAP.Transport; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -19,14 +20,16 @@ namespace DotNetCore.CAP.Internal { internal class ConsumerRegister : IConsumerRegister { + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; + private readonly IConsumerClientFactory _consumerClientFactory; private readonly IDispatcher _dispatcher; private readonly ISerializer _serializer; private readonly IDataStorage _storage; - private readonly ILogger _logger; + private readonly MethodMatcherCache _selector; private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); private readonly CapOptions _options; - private readonly MethodMatcherCache _selector; private CancellationTokenSource _cts; private BrokerAddress _serverAddress; @@ -39,21 +42,17 @@ namespace DotNetCore.CAP.Internal private static readonly DiagnosticListener s_diagnosticListener = new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName); - public ConsumerRegister(ILogger logger, - IOptions options, - MethodMatcherCache selector, - IConsumerClientFactory consumerClientFactory, - IDispatcher dispatcher, - ISerializer serializer, - IDataStorage storage) + public ConsumerRegister(ILogger logger, IServiceProvider serviceProvider) { - _options = options.Value; - _selector = selector; _logger = logger; - _consumerClientFactory = consumerClientFactory; - _dispatcher = dispatcher; - _serializer = serializer; - _storage = storage; + _serviceProvider = serviceProvider; + + _options = serviceProvider.GetService>().Value; + _selector = serviceProvider.GetService(); + _consumerClientFactory = serviceProvider.GetService(); + _dispatcher = serviceProvider.GetService(); + _serializer = serviceProvider.GetService(); + _storage = serviceProvider.GetService(); _cts = new CancellationTokenSource(); } @@ -152,7 +151,7 @@ namespace DotNetCore.CAP.Internal client.OnMessageReceived += async (sender, transportMessage) => { _logger.MessageReceived(transportMessage.GetId(), transportMessage.GetName()); - + long? tracingTimestamp = null; try { @@ -194,6 +193,22 @@ namespace DotNetCore.CAP.Internal client.Commit(sender); + try + { + _options.FailedThresholdCallback?.Invoke(new FailedInfo + { + ServiceProvider = _serviceProvider, + MessageType = MessageType.Subscribe, + Message = message + }); + + _logger.ConsumerExecutedAfterThreshold(message.GetId(), _options.FailedRetryCount); + } + catch (Exception e) + { + _logger.ExecutedThresholdCallbackFailed(e); + } + TracingAfter(tracingTimestamp, transportMessage, _serverAddress); } else diff --git a/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs b/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs index d256b2d..39c619c 100644 --- a/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs +++ b/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs @@ -9,6 +9,7 @@ using DotNetCore.CAP.Messages; using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Serialization; using DotNetCore.CAP.Transport; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -16,10 +17,12 @@ namespace DotNetCore.CAP.Internal { internal class MessageSender : IMessageSender { + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; + private readonly IDataStorage _dataStorage; private readonly ISerializer _serializer; private readonly ITransport _transport; - private readonly ILogger _logger; private readonly IOptions _options; // ReSharper disable once InconsistentNaming @@ -28,16 +31,15 @@ namespace DotNetCore.CAP.Internal public MessageSender( ILogger logger, - IOptions options, - IDataStorage dataStorage, - ISerializer serializer, - ITransport transport) + IServiceProvider serviceProvider) { - _options = options; - _dataStorage = dataStorage; - _serializer = serializer; - _transport = transport; _logger = logger; + _serviceProvider = serviceProvider; + + _options = serviceProvider.GetService>(); + _dataStorage = serviceProvider.GetService(); + _serializer = serviceProvider.GetService(); + _transport = serviceProvider.GetService(); } public async Task SendAsync(MediumMessage message) @@ -111,7 +113,12 @@ namespace DotNetCore.CAP.Internal { try { - _options.Value.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Origin); + _options.Value.FailedThresholdCallback?.Invoke(new FailedInfo + { + ServiceProvider = _serviceProvider, + MessageType = MessageType.Publish, + Message = message.Origin + }); _logger.SenderAfterThreshold(message.DbId, _options.Value.FailedRetryCount); } diff --git a/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs index a538a7e..1d1b9b3 100644 --- a/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs @@ -145,7 +145,12 @@ namespace DotNetCore.CAP.Internal { try { - _options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Origin); + _options.FailedThresholdCallback?.Invoke(new FailedInfo + { + ServiceProvider = _provider, + MessageType = MessageType.Subscribe, + Message = message.Origin + }); _logger.ConsumerExecutedAfterThreshold(message.DbId, _options.FailedRetryCount); } diff --git a/src/DotNetCore.CAP/Messages/FailedInfo.cs b/src/DotNetCore.CAP/Messages/FailedInfo.cs new file mode 100644 index 0000000..501307e --- /dev/null +++ b/src/DotNetCore.CAP/Messages/FailedInfo.cs @@ -0,0 +1,13 @@ +using System; + +namespace DotNetCore.CAP.Messages +{ + public class FailedInfo + { + public IServiceProvider ServiceProvider { get; set; } + + public MessageType MessageType { get; set; } + + public Message Message { get; set; } + } +}