diff --git a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs index 587525a..4de003c 100644 --- a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs +++ b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs @@ -39,7 +39,7 @@ namespace DotNetCore.CAP.RabbitMQ _connectionActivator = CreateConnection(options); HostAddress = $"{options.HostName}:{options.Port}"; - Exchange = CapOptions.DefaultVersion == capOptions.Version ? options.ExchangeName : $"{options.ExchangeName}.{capOptions.Version}"; + Exchange = "v1" == capOptions.Version ? options.ExchangeName : $"{options.ExchangeName}.{capOptions.Version}"; _logger.LogDebug($"RabbitMQ configuration:'HostName:{options.HostName}, Port:{options.Port}, UserName:{options.UserName}, Password:{options.Password}, ExchangeName:{options.ExchangeName}'"); } diff --git a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs index 81b6576..19ecbc9 100644 --- a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs +++ b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs @@ -3,6 +3,7 @@ using System; using System.Diagnostics; +using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Infrastructure; @@ -50,13 +51,13 @@ namespace DotNetCore.CAP private IConsumerInvoker Invoker { get; } - public async Task ExecuteAsync(CapReceivedMessage message) + public async Task ExecuteAsync(CapReceivedMessage message, CancellationToken cancellationToken) { bool retry; OperateResult result; do { - var executedResult = await ExecuteWithoutRetryAsync(message); + var executedResult = await ExecuteWithoutRetryAsync(message, cancellationToken); result = executedResult.Item2; if (result == OperateResult.Success) { @@ -72,19 +73,22 @@ namespace DotNetCore.CAP /// Execute message consumption once. /// /// the message received of + /// /// Item1 is need still retry, Item2 is executed result. - private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(CapReceivedMessage message) + private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(CapReceivedMessage message, CancellationToken cancellationToken) { if (message == null) { throw new ArgumentNullException(nameof(message)); } + cancellationToken.ThrowIfCancellationRequested(); + try { var sp = Stopwatch.StartNew(); - await InvokeConsumerMethodAsync(message); + await InvokeConsumerMethodAsync(message, cancellationToken); sp.Stop(); @@ -160,7 +164,7 @@ namespace DotNetCore.CAP message.Content = Helper.AddExceptionProperty(message.Content, exception); } - private async Task InvokeConsumerMethodAsync(CapReceivedMessage receivedMessage) + private async Task InvokeConsumerMethodAsync(CapReceivedMessage receivedMessage, CancellationToken cancellationToken) { if (!_selector.TryGetTopicExecutor(receivedMessage.Name, receivedMessage.Group, out var executor)) @@ -179,15 +183,20 @@ namespace DotNetCore.CAP { operationId = s_diagnosticListener.WriteSubscriberInvokeBefore(consumerContext); - var ret = await Invoker.InvokeAsync(consumerContext); + var ret = await Invoker.InvokeAsync(consumerContext, cancellationToken); - s_diagnosticListener.WriteSubscriberInvokeAfter(operationId, consumerContext, startTime, stopwatch.Elapsed); + s_diagnosticListener.WriteSubscriberInvokeAfter(operationId, consumerContext, startTime, + stopwatch.Elapsed); if (!string.IsNullOrEmpty(ret.CallbackName)) { await _callbackMessageSender.SendAsync(ret.MessageId, ret.CallbackName, ret.Result); } } + catch (OperationCanceledException) + { + //ignore + } catch (Exception ex) { s_diagnosticListener.WriteSubscriberInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed); diff --git a/src/DotNetCore.CAP/ISubscriberExecutor.cs b/src/DotNetCore.CAP/ISubscriberExecutor.cs index 2222f78..daeb60f 100644 --- a/src/DotNetCore.CAP/ISubscriberExecutor.cs +++ b/src/DotNetCore.CAP/ISubscriberExecutor.cs @@ -1,16 +1,17 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Models; namespace DotNetCore.CAP { /// - /// Consumer execotor + /// Consumer executor /// public interface ISubscriberExecutor { - Task ExecuteAsync(CapReceivedMessage message); + Task ExecuteAsync(CapReceivedMessage message, CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs index 8ec30dd..4392cd0 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs @@ -3,6 +3,7 @@ using System; using System.Linq; +using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Abstractions; using Microsoft.Extensions.DependencyInjection; @@ -29,8 +30,10 @@ namespace DotNetCore.CAP.Internal _logger = loggerFactory.CreateLogger(); } - public async Task InvokeAsync(ConsumerContext context) + public async Task InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default) { + cancellationToken.ThrowIfCancellationRequested(); + _logger.LogDebug("Executing consumer Topic: {0}", context.ConsumerDescriptor.MethodInfo.Name); var executor = ObjectMethodExecutor.Create( diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.cs b/src/DotNetCore.CAP/Internal/IConsumerInvoker.cs index 892c2e3..2dbf34c 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerInvoker.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System.Threading; using System.Threading.Tasks; namespace DotNetCore.CAP.Internal @@ -14,6 +15,7 @@ namespace DotNetCore.CAP.Internal /// Invoke consumer method whit consumer context. /// /// consumer execute context - Task InvokeAsync(ConsumerContext context); + /// The object of . + Task InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs index aadc2d7..ee036b1 100644 --- a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs @@ -57,11 +57,14 @@ namespace DotNetCore.CAP.Processor { while (!_publishedMessageQueue.IsCompleted) { - if (_publishedMessageQueue.TryTake(out var message, 100, _cts.Token)) + if (_publishedMessageQueue.TryTake(out var message, 3000, _cts.Token)) { try { - _sender.SendAsync(message); + Task.Run(async () => + { + await _sender.SendAsync(message); + }); } catch (Exception ex) { @@ -82,7 +85,7 @@ namespace DotNetCore.CAP.Processor { foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token)) { - _executor.ExecuteAsync(message); + _executor.ExecuteAsync(message, _cts.Token); } } catch (OperationCanceledException) diff --git a/src/DotNetCore.CAP/Processor/IProcessor.InfiniteRetry.cs b/src/DotNetCore.CAP/Processor/IProcessor.InfiniteRetry.cs index b7b5d43..9aba0e7 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.InfiniteRetry.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.InfiniteRetry.cs @@ -30,7 +30,7 @@ namespace DotNetCore.CAP.Processor } catch (OperationCanceledException) { - return; + //ignore } catch (Exception ex) { diff --git a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs index 66c1b39..a7e92b3 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs @@ -40,15 +40,15 @@ namespace DotNetCore.CAP.Processor var connection = context.Provider.GetRequiredService(); - await Task.WhenAll( - ProcessPublishedAsync(connection, context), - ProcessReceivedAsync(connection, context)); + await Task.WhenAll(ProcessPublishedAsync(connection, context), ProcessReceivedAsync(connection, context)); await context.WaitAsync(_waitingInterval); } private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context) { + context.ThrowIfStopping(); + var messages = await GetSafelyAsync(connection.GetPublishedMessagesOfNeedRetry); foreach (var message in messages) @@ -61,17 +61,17 @@ namespace DotNetCore.CAP.Processor private async Task ProcessReceivedAsync(IStorageConnection connection, ProcessingContext context) { + context.ThrowIfStopping(); + var messages = await GetSafelyAsync(connection.GetReceivedMessagesOfNeedRetry); foreach (var message in messages) { await _subscriberExecutor.ExecuteAsync(message); - context.ThrowIfStopping(); - await context.WaitAsync(_delay); } - } + } private async Task> GetSafelyAsync(Func>> getMessagesAsync) { diff --git a/test/DotNetCore.CAP.Test/ConsumerInvokerFactoryTest.cs b/test/DotNetCore.CAP.Test/ConsumerInvokerFactoryTest.cs index 0016815..00998ce 100644 --- a/test/DotNetCore.CAP.Test/ConsumerInvokerFactoryTest.cs +++ b/test/DotNetCore.CAP.Test/ConsumerInvokerFactoryTest.cs @@ -30,7 +30,7 @@ namespace DotNetCore.CAP.Test services.AddSingleton(_mockSerialiser.Object); services.AddSingleton(_mockMessagePacker.Object); services.AddSingleton(_mockModelBinderFactory.Object); - _serviceProvider = services.BuildServiceProvider(); + _serviceProvider = services.BuildServiceProvider(); } private ConsumerInvokerFactory Create() => @@ -74,7 +74,7 @@ namespace DotNetCore.CAP.Test Assert.Throws(() => { invoker.InvokeAsync(context).GetAwaiter().GetResult(); - }); + }); } } } \ No newline at end of file