@@ -95,10 +95,8 @@ namespace DotNetCore.CAP.Internal | |||||
{ | { | ||||
var needRetry = UpdateMessageForRetry(message); | var needRetry = UpdateMessageForRetry(message); | ||||
if (message.ExpiresAt != null) | |||||
{ | |||||
message.ExpiresAt = DateTime.Now.AddDays(15); | |||||
} | |||||
message.ExpiresAt = message.Added.AddDays(15); | |||||
await _dataStorage.ChangePublishStateAsync(message, StatusName.Failed); | await _dataStorage.ChangePublishStateAsync(message, StatusName.Failed); | ||||
return needRetry; | return needRetry; | ||||
@@ -106,12 +104,8 @@ namespace DotNetCore.CAP.Internal | |||||
private bool UpdateMessageForRetry(MediumMessage message) | private bool UpdateMessageForRetry(MediumMessage message) | ||||
{ | { | ||||
var retryBehavior = RetryBehavior.DefaultRetry; | |||||
var retries = ++message.Retries; | var retries = ++message.Retries; | ||||
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); | |||||
var retryCount = Math.Min(_options.Value.FailedRetryCount, retryBehavior.RetryCount); | |||||
var retryCount = Math.Min(_options.Value.FailedRetryCount, 3); | |||||
if (retries >= retryCount) | if (retries >= retryCount) | ||||
{ | { | ||||
if (retries == _options.Value.FailedRetryCount) | if (retries == _options.Value.FailedRetryCount) | ||||
@@ -194,7 +188,7 @@ namespace DotNetCore.CAP.Internal | |||||
s_diagnosticListener.Write(CapDiagnosticListenerNames.ErrorPublish, eventData); | s_diagnosticListener.Write(CapDiagnosticListenerNames.ErrorPublish, eventData); | ||||
} | } | ||||
} | |||||
} | |||||
#endregion | #endregion | ||||
} | } |
@@ -128,6 +128,8 @@ namespace DotNetCore.CAP.Internal | |||||
var needRetry = UpdateMessageForRetry(message); | var needRetry = UpdateMessageForRetry(message); | ||||
message.ExpiresAt = message.Added.AddDays(15); | |||||
await _dataStorage.ChangeReceiveStateAsync(message, StatusName.Failed); | await _dataStorage.ChangeReceiveStateAsync(message, StatusName.Failed); | ||||
return needRetry; | return needRetry; | ||||
@@ -135,12 +137,9 @@ namespace DotNetCore.CAP.Internal | |||||
private bool UpdateMessageForRetry(MediumMessage message) | private bool UpdateMessageForRetry(MediumMessage message) | ||||
{ | { | ||||
var retryBehavior = RetryBehavior.DefaultRetry; | |||||
var retries = ++message.Retries; | var retries = ++message.Retries; | ||||
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); | |||||
var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount); | |||||
var retryCount = Math.Min(_options.FailedRetryCount, 3); | |||||
if (retries >= retryCount) | if (retries >= retryCount) | ||||
{ | { | ||||
if (retries == _options.FailedRetryCount) | if (retries == _options.FailedRetryCount) | ||||
@@ -197,7 +196,7 @@ namespace DotNetCore.CAP.Internal | |||||
catch (Exception ex) | catch (Exception ex) | ||||
{ | { | ||||
var e = new SubscriberExecutionFailedException(ex.Message, ex); | var e = new SubscriberExecutionFailedException(ex.Message, ex); | ||||
TracingError(tracingTimestamp, message.Origin, descriptor.MethodInfo, e); | TracingError(tracingTimestamp, message.Origin, descriptor.MethodInfo, e); | ||||
throw e; | throw e; | ||||
@@ -1,79 +0,0 @@ | |||||
// Copyright (c) .NET Core Community. All rights reserved. | |||||
// Licensed under the MIT License. See License.txt in the project root for license information. | |||||
using System; | |||||
namespace DotNetCore.CAP.Processor | |||||
{ | |||||
public class RetryBehavior | |||||
{ | |||||
public static readonly int DefaultRetryCount; | |||||
public static readonly Func<int, int> DefaultRetryInThunk; | |||||
public static readonly RetryBehavior DefaultRetry; | |||||
public static readonly RetryBehavior NoRetry; | |||||
// ReSharper disable once InconsistentNaming | |||||
private static readonly Random _random = new Random(); | |||||
private readonly Func<int, int> _retryInThunk; | |||||
static RetryBehavior() | |||||
{ | |||||
DefaultRetryCount = 3; | |||||
DefaultRetryInThunk = retries => | |||||
(int) Math.Round(Math.Pow(retries - 1, 4) + 3 + _random.Next(30) * retries); | |||||
DefaultRetry = new RetryBehavior(true); | |||||
NoRetry = new RetryBehavior(false); | |||||
} | |||||
public RetryBehavior(bool retry) | |||||
: this(retry, DefaultRetryCount, DefaultRetryInThunk) | |||||
{ | |||||
} | |||||
/// <summary> | |||||
/// Initializes a new instance of the <see cref="RetryBehavior" /> class. | |||||
/// </summary> | |||||
/// <param name="retry">Whether to retry.</param> | |||||
/// <param name="retryCount">The maximum retry count.</param> | |||||
/// <param name="retryInThunk">The retry in function to use.</param> | |||||
public RetryBehavior(bool retry, int retryCount, Func<int, int> retryInThunk) | |||||
{ | |||||
if (retry) | |||||
{ | |||||
if (retryCount < 0) | |||||
{ | |||||
throw new ArgumentOutOfRangeException(nameof(retryCount), "Can't be negative."); | |||||
} | |||||
} | |||||
Retry = retry; | |||||
RetryCount = retryCount; | |||||
_retryInThunk = retryInThunk ?? DefaultRetryInThunk; | |||||
} | |||||
public Random Random => _random; | |||||
/// <summary> | |||||
/// Gets whether to retry or disable retrying. | |||||
/// </summary> | |||||
public bool Retry { get; } | |||||
/// <summary> | |||||
/// Gets the maximum retry count. | |||||
/// </summary> | |||||
public int RetryCount { get; } | |||||
/// <summary> | |||||
/// Returns the seconds to delay before retrying again. | |||||
/// </summary> | |||||
/// <param name="retries">The current retry count.</param> | |||||
/// <returns>The seconds to delay.</returns> | |||||
public int RetryIn(int retries) | |||||
{ | |||||
return _retryInThunk(retries); | |||||
} | |||||
} | |||||
} |