Browse Source

Fixed SendAsync or ExecuteAsync recursion retries bug. (#160)

undefined
Savorboard 6 years ago
parent
commit
ba7a5a2cb3
2 changed files with 47 additions and 44 deletions
  1. +23
    -21
      src/DotNetCore.CAP/IPublishMessageSender.Base.cs
  2. +24
    -23
      src/DotNetCore.CAP/ISubscribeExecutor.Default.cs

+ 23
- 21
src/DotNetCore.CAP/IPublishMessageSender.Base.cs View File

@@ -43,6 +43,23 @@ namespace DotNetCore.CAP
public abstract Task<OperateResult> PublishAsync(string keyName, string content);

public async Task<OperateResult> SendAsync(CapPublishedMessage message)
{
bool retry;
OperateResult result;
do
{
result = await SendWithoutRetryAsync(message);
if (result == OperateResult.Success)
{
return result;
}
retry = UpdateMessageForRetry(message);
} while (retry);

return result;
}

private async Task<OperateResult> SendWithoutRetryAsync(CapPublishedMessage message)
{
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
@@ -69,28 +86,23 @@ namespace DotNetCore.CAP
{
TracingError(operationId, message, result, startTime, stopwatch.Elapsed);

await SetFailedState(message, result.Exception, out bool stillRetry);

if (stillRetry)
{
_logger.SenderRetrying(message.Id, message.Retries);
await SetFailedState(message, result.Exception);

return await SendAsync(message);
}
return OperateResult.Failed(result.Exception);
}
}

private static bool UpdateMessageForRetryAsync(CapPublishedMessage message)
private bool UpdateMessageForRetry(CapPublishedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;

var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}

_logger.SenderRetrying(message.Id, retries);

var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;

@@ -100,23 +112,13 @@ namespace DotNetCore.CAP
private Task SetSuccessfulState(CapPublishedMessage message)
{
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);

return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
}

private Task SetFailedState(CapPublishedMessage message, Exception ex, out bool stillRetry)
private Task SetFailedState(CapPublishedMessage message, Exception ex)
{
IState newState = new FailedState();
stillRetry = UpdateMessageForRetryAsync(message);
if (stillRetry)
{
_logger.ConsumerExecutionFailedWillRetry(ex);
return Task.CompletedTask;
}

AddErrorReasonToContent(message, ex);

return _stateChanger.ChangeStateAsync(message, newState, _connection);
return _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
}

private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception)


+ 24
- 23
src/DotNetCore.CAP/ISubscribeExecutor.Default.cs View File

@@ -50,6 +50,23 @@ namespace DotNetCore.CAP
private IConsumerInvoker Invoker { get; }

public async Task<OperateResult> ExecuteAsync(CapReceivedMessage message)
{
bool retry;
OperateResult result;
do
{
result = await ExecuteWithoutRetryAsync(message);
if (result == OperateResult.Success)
{
return result;
}
retry = UpdateMessageForRetry(message);
} while (retry);

return result;
}

private async Task<OperateResult> ExecuteWithoutRetryAsync(CapReceivedMessage message)
{
if (message == null)
{
@@ -74,11 +91,7 @@ namespace DotNetCore.CAP
{
_logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}");

await SetFailedState(message, ex, out bool stillRetry);
if (stillRetry)
{
return await ExecuteAsync(message);
}
await SetFailedState(message, ex);

return OperateResult.Failed(ex);
}
@@ -91,43 +104,31 @@ namespace DotNetCore.CAP
return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
}

private Task SetFailedState(CapReceivedMessage message, Exception ex, out bool stillRetry)
private Task SetFailedState(CapReceivedMessage message, Exception ex)
{
IState newState = new FailedState();

if (ex is SubscriberNotFoundException)
{
stillRetry = false;
message.Retries = _options.FailedRetryCount; // not retry if SubscriberNotFoundException
}
else
{
stillRetry = UpdateMessageForRetry(message);
if (stillRetry)
{
_logger.ConsumerExecutionFailedWillRetry(ex);
return Task.CompletedTask;
}
}

AddErrorReasonToContent(message, ex);

return _stateChanger.ChangeStateAsync(message, newState, _connection);
return _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
}

private static bool UpdateMessageForRetry(CapReceivedMessage message)
private bool UpdateMessageForRetry(CapReceivedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;

var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount)
{
return false;
}
_logger.ConsumerExecutionRetrying(message.Id, retries);
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;

return true;
}



Loading…
Cancel
Save