@@ -39,7 +39,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
_connectionActivator = CreateConnection(options); | _connectionActivator = CreateConnection(options); | ||||
HostAddress = $"{options.HostName}:{options.Port}"; | HostAddress = $"{options.HostName}:{options.Port}"; | ||||
Exchange = CapOptions.DefaultVersion == capOptions.Version ? options.ExchangeName : $"{options.ExchangeName}.{capOptions.Version}"; | |||||
Exchange = "v1" == capOptions.Version ? options.ExchangeName : $"{options.ExchangeName}.{capOptions.Version}"; | |||||
_logger.LogDebug($"RabbitMQ configuration:'HostName:{options.HostName}, Port:{options.Port}, UserName:{options.UserName}, Password:{options.Password}, ExchangeName:{options.ExchangeName}'"); | _logger.LogDebug($"RabbitMQ configuration:'HostName:{options.HostName}, Port:{options.Port}, UserName:{options.UserName}, Password:{options.Password}, ExchangeName:{options.ExchangeName}'"); | ||||
} | } | ||||
@@ -3,6 +3,7 @@ | |||||
using System; | using System; | ||||
using System.Diagnostics; | using System.Diagnostics; | ||||
using System.Threading; | |||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using DotNetCore.CAP.Diagnostics; | using DotNetCore.CAP.Diagnostics; | ||||
using DotNetCore.CAP.Infrastructure; | using DotNetCore.CAP.Infrastructure; | ||||
@@ -50,13 +51,13 @@ namespace DotNetCore.CAP | |||||
private IConsumerInvoker Invoker { get; } | private IConsumerInvoker Invoker { get; } | ||||
public async Task<OperateResult> ExecuteAsync(CapReceivedMessage message) | |||||
public async Task<OperateResult> ExecuteAsync(CapReceivedMessage message, CancellationToken cancellationToken) | |||||
{ | { | ||||
bool retry; | bool retry; | ||||
OperateResult result; | OperateResult result; | ||||
do | do | ||||
{ | { | ||||
var executedResult = await ExecuteWithoutRetryAsync(message); | |||||
var executedResult = await ExecuteWithoutRetryAsync(message, cancellationToken); | |||||
result = executedResult.Item2; | result = executedResult.Item2; | ||||
if (result == OperateResult.Success) | if (result == OperateResult.Success) | ||||
{ | { | ||||
@@ -72,19 +73,22 @@ namespace DotNetCore.CAP | |||||
/// Execute message consumption once. | /// Execute message consumption once. | ||||
/// </summary> | /// </summary> | ||||
/// <param name="message">the message received of <see cref="CapReceivedMessage"/></param> | /// <param name="message">the message received of <see cref="CapReceivedMessage"/></param> | ||||
/// <param name="cancellationToken"></param> | |||||
/// <returns>Item1 is need still retry, Item2 is executed result.</returns> | /// <returns>Item1 is need still retry, Item2 is executed result.</returns> | ||||
private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(CapReceivedMessage message) | |||||
private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(CapReceivedMessage message, CancellationToken cancellationToken) | |||||
{ | { | ||||
if (message == null) | if (message == null) | ||||
{ | { | ||||
throw new ArgumentNullException(nameof(message)); | throw new ArgumentNullException(nameof(message)); | ||||
} | } | ||||
cancellationToken.ThrowIfCancellationRequested(); | |||||
try | try | ||||
{ | { | ||||
var sp = Stopwatch.StartNew(); | var sp = Stopwatch.StartNew(); | ||||
await InvokeConsumerMethodAsync(message); | |||||
await InvokeConsumerMethodAsync(message, cancellationToken); | |||||
sp.Stop(); | sp.Stop(); | ||||
@@ -160,7 +164,7 @@ namespace DotNetCore.CAP | |||||
message.Content = Helper.AddExceptionProperty(message.Content, exception); | message.Content = Helper.AddExceptionProperty(message.Content, exception); | ||||
} | } | ||||
private async Task InvokeConsumerMethodAsync(CapReceivedMessage receivedMessage) | |||||
private async Task InvokeConsumerMethodAsync(CapReceivedMessage receivedMessage, CancellationToken cancellationToken) | |||||
{ | { | ||||
if (!_selector.TryGetTopicExecutor(receivedMessage.Name, receivedMessage.Group, | if (!_selector.TryGetTopicExecutor(receivedMessage.Name, receivedMessage.Group, | ||||
out var executor)) | out var executor)) | ||||
@@ -179,15 +183,20 @@ namespace DotNetCore.CAP | |||||
{ | { | ||||
operationId = s_diagnosticListener.WriteSubscriberInvokeBefore(consumerContext); | operationId = s_diagnosticListener.WriteSubscriberInvokeBefore(consumerContext); | ||||
var ret = await Invoker.InvokeAsync(consumerContext); | |||||
var ret = await Invoker.InvokeAsync(consumerContext, cancellationToken); | |||||
s_diagnosticListener.WriteSubscriberInvokeAfter(operationId, consumerContext, startTime, stopwatch.Elapsed); | |||||
s_diagnosticListener.WriteSubscriberInvokeAfter(operationId, consumerContext, startTime, | |||||
stopwatch.Elapsed); | |||||
if (!string.IsNullOrEmpty(ret.CallbackName)) | if (!string.IsNullOrEmpty(ret.CallbackName)) | ||||
{ | { | ||||
await _callbackMessageSender.SendAsync(ret.MessageId, ret.CallbackName, ret.Result); | await _callbackMessageSender.SendAsync(ret.MessageId, ret.CallbackName, ret.Result); | ||||
} | } | ||||
} | } | ||||
catch (OperationCanceledException) | |||||
{ | |||||
//ignore | |||||
} | |||||
catch (Exception ex) | catch (Exception ex) | ||||
{ | { | ||||
s_diagnosticListener.WriteSubscriberInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed); | s_diagnosticListener.WriteSubscriberInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed); | ||||
@@ -1,16 +1,17 @@ | |||||
// Copyright (c) .NET Core Community. All rights reserved. | // Copyright (c) .NET Core Community. All rights reserved. | ||||
// Licensed under the MIT License. See License.txt in the project root for license information. | // Licensed under the MIT License. See License.txt in the project root for license information. | ||||
using System.Threading; | |||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using DotNetCore.CAP.Models; | using DotNetCore.CAP.Models; | ||||
namespace DotNetCore.CAP | namespace DotNetCore.CAP | ||||
{ | { | ||||
/// <summary> | /// <summary> | ||||
/// Consumer execotor | |||||
/// Consumer executor | |||||
/// </summary> | /// </summary> | ||||
public interface ISubscriberExecutor | public interface ISubscriberExecutor | ||||
{ | { | ||||
Task<OperateResult> ExecuteAsync(CapReceivedMessage message); | |||||
Task<OperateResult> ExecuteAsync(CapReceivedMessage message, CancellationToken cancellationToken = default); | |||||
} | } | ||||
} | } |
@@ -3,6 +3,7 @@ | |||||
using System; | using System; | ||||
using System.Linq; | using System.Linq; | ||||
using System.Threading; | |||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using DotNetCore.CAP.Abstractions; | using DotNetCore.CAP.Abstractions; | ||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
@@ -29,8 +30,10 @@ namespace DotNetCore.CAP.Internal | |||||
_logger = loggerFactory.CreateLogger<DefaultConsumerInvoker>(); | _logger = loggerFactory.CreateLogger<DefaultConsumerInvoker>(); | ||||
} | } | ||||
public async Task<ConsumerExecutedResult> InvokeAsync(ConsumerContext context) | |||||
public async Task<ConsumerExecutedResult> InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default) | |||||
{ | { | ||||
cancellationToken.ThrowIfCancellationRequested(); | |||||
_logger.LogDebug("Executing consumer Topic: {0}", context.ConsumerDescriptor.MethodInfo.Name); | _logger.LogDebug("Executing consumer Topic: {0}", context.ConsumerDescriptor.MethodInfo.Name); | ||||
var executor = ObjectMethodExecutor.Create( | var executor = ObjectMethodExecutor.Create( | ||||
@@ -1,6 +1,7 @@ | |||||
// Copyright (c) .NET Core Community. All rights reserved. | // Copyright (c) .NET Core Community. All rights reserved. | ||||
// Licensed under the MIT License. See License.txt in the project root for license information. | // Licensed under the MIT License. See License.txt in the project root for license information. | ||||
using System.Threading; | |||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
namespace DotNetCore.CAP.Internal | namespace DotNetCore.CAP.Internal | ||||
@@ -14,6 +15,7 @@ namespace DotNetCore.CAP.Internal | |||||
/// Invoke consumer method whit consumer context. | /// Invoke consumer method whit consumer context. | ||||
/// </summary> | /// </summary> | ||||
/// <param name="context">consumer execute context</param> | /// <param name="context">consumer execute context</param> | ||||
Task<ConsumerExecutedResult> InvokeAsync(ConsumerContext context); | |||||
/// <param name="cancellationToken">The object of <see cref="CancellationToken"/>.</param> | |||||
Task<ConsumerExecutedResult> InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default); | |||||
} | } | ||||
} | } |
@@ -57,11 +57,14 @@ namespace DotNetCore.CAP.Processor | |||||
{ | { | ||||
while (!_publishedMessageQueue.IsCompleted) | while (!_publishedMessageQueue.IsCompleted) | ||||
{ | { | ||||
if (_publishedMessageQueue.TryTake(out var message, 100, _cts.Token)) | |||||
if (_publishedMessageQueue.TryTake(out var message, 3000, _cts.Token)) | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
_sender.SendAsync(message); | |||||
Task.Run(async () => | |||||
{ | |||||
await _sender.SendAsync(message); | |||||
}); | |||||
} | } | ||||
catch (Exception ex) | catch (Exception ex) | ||||
{ | { | ||||
@@ -82,7 +85,7 @@ namespace DotNetCore.CAP.Processor | |||||
{ | { | ||||
foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token)) | foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token)) | ||||
{ | { | ||||
_executor.ExecuteAsync(message); | |||||
_executor.ExecuteAsync(message, _cts.Token); | |||||
} | } | ||||
} | } | ||||
catch (OperationCanceledException) | catch (OperationCanceledException) | ||||
@@ -30,7 +30,7 @@ namespace DotNetCore.CAP.Processor | |||||
} | } | ||||
catch (OperationCanceledException) | catch (OperationCanceledException) | ||||
{ | { | ||||
return; | |||||
//ignore | |||||
} | } | ||||
catch (Exception ex) | catch (Exception ex) | ||||
{ | { | ||||
@@ -40,15 +40,15 @@ namespace DotNetCore.CAP.Processor | |||||
var connection = context.Provider.GetRequiredService<IStorageConnection>(); | var connection = context.Provider.GetRequiredService<IStorageConnection>(); | ||||
await Task.WhenAll( | |||||
ProcessPublishedAsync(connection, context), | |||||
ProcessReceivedAsync(connection, context)); | |||||
await Task.WhenAll(ProcessPublishedAsync(connection, context), ProcessReceivedAsync(connection, context)); | |||||
await context.WaitAsync(_waitingInterval); | await context.WaitAsync(_waitingInterval); | ||||
} | } | ||||
private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context) | private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context) | ||||
{ | { | ||||
context.ThrowIfStopping(); | |||||
var messages = await GetSafelyAsync(connection.GetPublishedMessagesOfNeedRetry); | var messages = await GetSafelyAsync(connection.GetPublishedMessagesOfNeedRetry); | ||||
foreach (var message in messages) | foreach (var message in messages) | ||||
@@ -61,17 +61,17 @@ namespace DotNetCore.CAP.Processor | |||||
private async Task ProcessReceivedAsync(IStorageConnection connection, ProcessingContext context) | private async Task ProcessReceivedAsync(IStorageConnection connection, ProcessingContext context) | ||||
{ | { | ||||
context.ThrowIfStopping(); | |||||
var messages = await GetSafelyAsync(connection.GetReceivedMessagesOfNeedRetry); | var messages = await GetSafelyAsync(connection.GetReceivedMessagesOfNeedRetry); | ||||
foreach (var message in messages) | foreach (var message in messages) | ||||
{ | { | ||||
await _subscriberExecutor.ExecuteAsync(message); | await _subscriberExecutor.ExecuteAsync(message); | ||||
context.ThrowIfStopping(); | |||||
await context.WaitAsync(_delay); | await context.WaitAsync(_delay); | ||||
} | } | ||||
} | |||||
} | |||||
private async Task<IEnumerable<T>> GetSafelyAsync<T>(Func<Task<IEnumerable<T>>> getMessagesAsync) | private async Task<IEnumerable<T>> GetSafelyAsync<T>(Func<Task<IEnumerable<T>>> getMessagesAsync) | ||||
{ | { | ||||
@@ -30,7 +30,7 @@ namespace DotNetCore.CAP.Test | |||||
services.AddSingleton(_mockSerialiser.Object); | services.AddSingleton(_mockSerialiser.Object); | ||||
services.AddSingleton(_mockMessagePacker.Object); | services.AddSingleton(_mockMessagePacker.Object); | ||||
services.AddSingleton(_mockModelBinderFactory.Object); | services.AddSingleton(_mockModelBinderFactory.Object); | ||||
_serviceProvider = services.BuildServiceProvider(); | |||||
_serviceProvider = services.BuildServiceProvider(); | |||||
} | } | ||||
private ConsumerInvokerFactory Create() => | private ConsumerInvokerFactory Create() => | ||||
@@ -74,7 +74,7 @@ namespace DotNetCore.CAP.Test | |||||
Assert.Throws<Exception>(() => | Assert.Throws<Exception>(() => | ||||
{ | { | ||||
invoker.InvokeAsync(context).GetAwaiter().GetResult(); | invoker.InvokeAsync(context).GetAwaiter().GetResult(); | ||||
}); | |||||
}); | |||||
} | } | ||||
} | } | ||||
} | } |