diff --git a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs index a64a6a7..d36bc51 100644 --- a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs +++ b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs @@ -43,6 +43,23 @@ namespace DotNetCore.CAP public abstract Task PublishAsync(string keyName, string content); public async Task SendAsync(CapPublishedMessage message) + { + bool retry; + OperateResult result; + do + { + result = await SendWithoutRetryAsync(message); + if (result == OperateResult.Success) + { + return result; + } + retry = UpdateMessageForRetry(message); + } while (retry); + + return result; + } + + private async Task SendWithoutRetryAsync(CapPublishedMessage message) { var startTime = DateTimeOffset.UtcNow; var stopwatch = Stopwatch.StartNew(); @@ -69,28 +86,23 @@ namespace DotNetCore.CAP { TracingError(operationId, message, result, startTime, stopwatch.Elapsed); - await SetFailedState(message, result.Exception, out bool stillRetry); - - if (stillRetry) - { - _logger.SenderRetrying(message.Id, message.Retries); + await SetFailedState(message, result.Exception); - return await SendAsync(message); - } return OperateResult.Failed(result.Exception); } } - private static bool UpdateMessageForRetryAsync(CapPublishedMessage message) + private bool UpdateMessageForRetry(CapPublishedMessage message) { var retryBehavior = RetryBehavior.DefaultRetry; - var retries = ++message.Retries; if (retries >= retryBehavior.RetryCount) { return false; } + _logger.SenderRetrying(message.Id, retries); + var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); message.ExpiresAt = due; @@ -100,23 +112,13 @@ namespace DotNetCore.CAP private Task SetSuccessfulState(CapPublishedMessage message) { var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter); - return _stateChanger.ChangeStateAsync(message, succeededState, _connection); } - private Task SetFailedState(CapPublishedMessage message, Exception ex, out bool stillRetry) + private Task SetFailedState(CapPublishedMessage message, Exception ex) { - IState newState = new FailedState(); - stillRetry = UpdateMessageForRetryAsync(message); - if (stillRetry) - { - _logger.ConsumerExecutionFailedWillRetry(ex); - return Task.CompletedTask; - } - AddErrorReasonToContent(message, ex); - - return _stateChanger.ChangeStateAsync(message, newState, _connection); + return _stateChanger.ChangeStateAsync(message, new FailedState(), _connection); } private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception) diff --git a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs index d069d95..f56dc73 100644 --- a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs +++ b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs @@ -50,6 +50,23 @@ namespace DotNetCore.CAP private IConsumerInvoker Invoker { get; } public async Task ExecuteAsync(CapReceivedMessage message) + { + bool retry; + OperateResult result; + do + { + result = await ExecuteWithoutRetryAsync(message); + if (result == OperateResult.Success) + { + return result; + } + retry = UpdateMessageForRetry(message); + } while (retry); + + return result; + } + + private async Task ExecuteWithoutRetryAsync(CapReceivedMessage message) { if (message == null) { @@ -74,11 +91,7 @@ namespace DotNetCore.CAP { _logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}"); - await SetFailedState(message, ex, out bool stillRetry); - if (stillRetry) - { - return await ExecuteAsync(message); - } + await SetFailedState(message, ex); return OperateResult.Failed(ex); } @@ -91,43 +104,31 @@ namespace DotNetCore.CAP return _stateChanger.ChangeStateAsync(message, succeededState, _connection); } - private Task SetFailedState(CapReceivedMessage message, Exception ex, out bool stillRetry) + private Task SetFailedState(CapReceivedMessage message, Exception ex) { - IState newState = new FailedState(); - if (ex is SubscriberNotFoundException) { - stillRetry = false; message.Retries = _options.FailedRetryCount; // not retry if SubscriberNotFoundException } - else - { - stillRetry = UpdateMessageForRetry(message); - if (stillRetry) - { - _logger.ConsumerExecutionFailedWillRetry(ex); - return Task.CompletedTask; - } - } AddErrorReasonToContent(message, ex); - return _stateChanger.ChangeStateAsync(message, newState, _connection); + return _stateChanger.ChangeStateAsync(message, new FailedState(), _connection); } - private static bool UpdateMessageForRetry(CapReceivedMessage message) + private bool UpdateMessageForRetry(CapReceivedMessage message) { var retryBehavior = RetryBehavior.DefaultRetry; var retries = ++message.Retries; - if (retries >= retryBehavior.RetryCount) + var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount); + if (retries >= retryCount) { return false; } - + _logger.ConsumerExecutionRetrying(message.Id, retries); var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); message.ExpiresAt = due; - return true; }