Browse Source

Fixed SendAsync or ExecuteAsync recursion retries bug. (#160)

undefined
Savorboard 6 years ago
parent
commit
87b9a37760
2 changed files with 47 additions and 44 deletions
  1. +23
    -21
      src/DotNetCore.CAP/IPublishMessageSender.Base.cs
  2. +24
    -23
      src/DotNetCore.CAP/ISubscribeExecutor.Default.cs

+ 23
- 21
src/DotNetCore.CAP/IPublishMessageSender.Base.cs View File

@@ -43,6 +43,23 @@ namespace DotNetCore.CAP
public abstract Task<OperateResult> PublishAsync(string keyName, string content); public abstract Task<OperateResult> PublishAsync(string keyName, string content);


public async Task<OperateResult> SendAsync(CapPublishedMessage message) public async Task<OperateResult> SendAsync(CapPublishedMessage message)
{
bool retry;
OperateResult result;
do
{
result = await SendWithoutRetryAsync(message);
if (result == OperateResult.Success)
{
return result;
}
retry = UpdateMessageForRetry(message);
} while (retry);

return result;
}

private async Task<OperateResult> SendWithoutRetryAsync(CapPublishedMessage message)
{ {
var startTime = DateTimeOffset.UtcNow; var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew(); var stopwatch = Stopwatch.StartNew();
@@ -69,28 +86,23 @@ namespace DotNetCore.CAP
{ {
TracingError(operationId, message, result, startTime, stopwatch.Elapsed); TracingError(operationId, message, result, startTime, stopwatch.Elapsed);


await SetFailedState(message, result.Exception, out bool stillRetry);

if (stillRetry)
{
_logger.SenderRetrying(message.Id, message.Retries);
await SetFailedState(message, result.Exception);


return await SendAsync(message);
}
return OperateResult.Failed(result.Exception); return OperateResult.Failed(result.Exception);
} }
} }


private static bool UpdateMessageForRetryAsync(CapPublishedMessage message)
private bool UpdateMessageForRetry(CapPublishedMessage message)
{ {
var retryBehavior = RetryBehavior.DefaultRetry; var retryBehavior = RetryBehavior.DefaultRetry;

var retries = ++message.Retries; var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount) if (retries >= retryBehavior.RetryCount)
{ {
return false; return false;
} }


_logger.SenderRetrying(message.Id, retries);

var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due; message.ExpiresAt = due;


@@ -100,23 +112,13 @@ namespace DotNetCore.CAP
private Task SetSuccessfulState(CapPublishedMessage message) private Task SetSuccessfulState(CapPublishedMessage message)
{ {
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter); var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);

return _stateChanger.ChangeStateAsync(message, succeededState, _connection); return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
} }


private Task SetFailedState(CapPublishedMessage message, Exception ex, out bool stillRetry)
private Task SetFailedState(CapPublishedMessage message, Exception ex)
{ {
IState newState = new FailedState();
stillRetry = UpdateMessageForRetryAsync(message);
if (stillRetry)
{
_logger.ConsumerExecutionFailedWillRetry(ex);
return Task.CompletedTask;
}

AddErrorReasonToContent(message, ex); AddErrorReasonToContent(message, ex);

return _stateChanger.ChangeStateAsync(message, newState, _connection);
return _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
} }


private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception) private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception)


+ 24
- 23
src/DotNetCore.CAP/ISubscribeExecutor.Default.cs View File

@@ -50,6 +50,23 @@ namespace DotNetCore.CAP
private IConsumerInvoker Invoker { get; } private IConsumerInvoker Invoker { get; }


public async Task<OperateResult> ExecuteAsync(CapReceivedMessage message) public async Task<OperateResult> ExecuteAsync(CapReceivedMessage message)
{
bool retry;
OperateResult result;
do
{
result = await ExecuteWithoutRetryAsync(message);
if (result == OperateResult.Success)
{
return result;
}
retry = UpdateMessageForRetry(message);
} while (retry);

return result;
}

private async Task<OperateResult> ExecuteWithoutRetryAsync(CapReceivedMessage message)
{ {
if (message == null) if (message == null)
{ {
@@ -74,11 +91,7 @@ namespace DotNetCore.CAP
{ {
_logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}"); _logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}");


await SetFailedState(message, ex, out bool stillRetry);
if (stillRetry)
{
return await ExecuteAsync(message);
}
await SetFailedState(message, ex);


return OperateResult.Failed(ex); return OperateResult.Failed(ex);
} }
@@ -91,43 +104,31 @@ namespace DotNetCore.CAP
return _stateChanger.ChangeStateAsync(message, succeededState, _connection); return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
} }


private Task SetFailedState(CapReceivedMessage message, Exception ex, out bool stillRetry)
private Task SetFailedState(CapReceivedMessage message, Exception ex)
{ {
IState newState = new FailedState();

if (ex is SubscriberNotFoundException) if (ex is SubscriberNotFoundException)
{ {
stillRetry = false;
message.Retries = _options.FailedRetryCount; // not retry if SubscriberNotFoundException message.Retries = _options.FailedRetryCount; // not retry if SubscriberNotFoundException
} }
else
{
stillRetry = UpdateMessageForRetry(message);
if (stillRetry)
{
_logger.ConsumerExecutionFailedWillRetry(ex);
return Task.CompletedTask;
}
}


AddErrorReasonToContent(message, ex); AddErrorReasonToContent(message, ex);


return _stateChanger.ChangeStateAsync(message, newState, _connection);
return _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
} }


private static bool UpdateMessageForRetry(CapReceivedMessage message)
private bool UpdateMessageForRetry(CapReceivedMessage message)
{ {
var retryBehavior = RetryBehavior.DefaultRetry; var retryBehavior = RetryBehavior.DefaultRetry;


var retries = ++message.Retries; var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount)
{ {
return false; return false;
} }
_logger.ConsumerExecutionRetrying(message.Id, retries);
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due; message.ExpiresAt = due;

return true; return true;
} }




Loading…
Cancel
Save