Browse Source

Fixed configuration options of FailedThresholdCallback could not be invoke when the value less then three. (#161)

master
Savorboard 6 years ago
parent
commit
7089b1ccde
2 changed files with 79 additions and 37 deletions
  1. +45
    -25
      src/DotNetCore.CAP/IPublishMessageSender.Base.cs
  2. +34
    -12
      src/DotNetCore.CAP/ISubscribeExecutor.Default.cs

+ 45
- 25
src/DotNetCore.CAP/IPublishMessageSender.Base.cs View File

@@ -48,18 +48,19 @@ namespace DotNetCore.CAP
OperateResult result;
do
{
result = await SendWithoutRetryAsync(message);
var executedResult = await SendWithoutRetryAsync(message);
result = executedResult.Item2;
if (result == OperateResult.Success)
{
return result;
}
retry = UpdateMessageForRetry(message);
retry = executedResult.Item1;
} while (retry);

return result;
}

private async Task<OperateResult> SendWithoutRetryAsync(CapPublishedMessage message)
private async Task<(bool, OperateResult)> SendWithoutRetryAsync(CapPublishedMessage message)
{
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
@@ -80,34 +81,17 @@ namespace DotNetCore.CAP

TracingAfter(operationId, message.Name, sendValues, startTime, stopwatch.Elapsed);

return OperateResult.Success;
return (false, OperateResult.Success);
}
else
{
TracingError(operationId, message, result, startTime, stopwatch.Elapsed);

await SetFailedState(message, result.Exception);

return OperateResult.Failed(result.Exception);
var needRetry = await SetFailedState(message, result.Exception);
return (needRetry, OperateResult.Failed(result.Exception));
}
}

private bool UpdateMessageForRetry(CapPublishedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}

_logger.SenderRetrying(message.Id, retries);

var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;

return true;
}

private Task SetSuccessfulState(CapPublishedMessage message)
{
@@ -115,10 +99,15 @@ namespace DotNetCore.CAP
return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
}

private Task SetFailedState(CapPublishedMessage message, Exception ex)
private async Task<bool> SetFailedState(CapPublishedMessage message, Exception ex)
{
AddErrorReasonToContent(message, ex);
return _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);

var needRetry = UpdateMessageForRetry(message);

await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);

return needRetry;
}

private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception)
@@ -126,6 +115,37 @@ namespace DotNetCore.CAP
message.Content = Helper.AddExceptionProperty(message.Content, exception);
}

private bool UpdateMessageForRetry(CapPublishedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;

var retries = ++message.Retries;
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));

var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount)
{
if (retries == _options.FailedRetryCount)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);

_logger.SenderAfterThreshold(message.Id, _options.FailedRetryCount);
}
catch (Exception ex)
{
_logger.ExecutedThresholdCallbackFailed(ex);
}
}
return false;
}

_logger.SenderRetrying(message.Id, retries);

return true;
}

private (Guid, TracingHeaders) TracingBefore(string topic, string values)
{
Guid operationId = Guid.NewGuid();


+ 34
- 12
src/DotNetCore.CAP/ISubscribeExecutor.Default.cs View File

@@ -55,18 +55,24 @@ namespace DotNetCore.CAP
OperateResult result;
do
{
result = await ExecuteWithoutRetryAsync(message);
var executedResult = await ExecuteWithoutRetryAsync(message);
result = executedResult.Item2;
if (result == OperateResult.Success)
{
return result;
}
retry = UpdateMessageForRetry(message);
retry = executedResult.Item1;
} while (retry);

return result;
}

private async Task<OperateResult> ExecuteWithoutRetryAsync(CapReceivedMessage message)
/// <summary>
/// Execute message consumption once.
/// </summary>
/// <param name="message">the message rececived of <see cref="CapReceivedMessage"/></param>
/// <returns>Item1 is need still restry, Item2 is executed result.</returns>
private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(CapReceivedMessage message)
{
if (message == null)
{
@@ -85,26 +91,23 @@ namespace DotNetCore.CAP

_logger.ConsumerExecuted(sp.Elapsed.TotalSeconds);

return OperateResult.Success;
return (false, OperateResult.Success);
}
catch (Exception ex)
{
_logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}");

await SetFailedState(message, ex);

return OperateResult.Failed(ex);
return (await SetFailedState(message, ex), OperateResult.Failed(ex));
}
}

private Task SetSuccessfulState(CapReceivedMessage message)
{
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);

return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
}

private Task SetFailedState(CapReceivedMessage message, Exception ex)
private async Task<bool> SetFailedState(CapReceivedMessage message, Exception ex)
{
if (ex is SubscriberNotFoundException)
{
@@ -113,7 +116,11 @@ namespace DotNetCore.CAP

AddErrorReasonToContent(message, ex);

return _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
var needRetry = UpdateMessageForRetry(message);

await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);

return needRetry;
}

private bool UpdateMessageForRetry(CapReceivedMessage message)
@@ -121,14 +128,29 @@ namespace DotNetCore.CAP
var retryBehavior = RetryBehavior.DefaultRetry;

var retries = ++message.Retries;
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));

var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount)
{
if (retries == _options.FailedRetryCount)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);

_logger.ConsumerExecutedAfterThreshold(message.Id, _options.FailedRetryCount);
}
catch (Exception ex)
{
_logger.ExecutedThresholdCallbackFailed(ex);
}
}
return false;
}

_logger.ConsumerExecutionRetrying(message.Id, retries);
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;

return true;
}



Loading…
Cancel
Save