diff --git a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs index b1109a3..dbd2212 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs @@ -80,18 +80,29 @@ namespace DotNetCore.CAP.Processor using (var transaction = connection.CreateTransaction()) { - try + var result = await _publishExecutor.PublishAsync(message.Name, message.Content); + if (result.Succeeded) { - await _publishExecutor.PublishAsync(message.Name, message.Content); - _stateChanger.ChangeState(message, new SucceededState(), transaction); + _logger.LogInformation("The message was sent successfully during the retry. MessageId:" + message.Id); } - catch (Exception e) + else { - message.Content = Helper.AddExceptionProperty(message.Content, e); + message.Content = Helper.AddExceptionProperty(message.Content, result.Exception); + message.Retries++; + if (message.StatusName == StatusName.Scheduled) + { + message.ExpiresAt = GetDueTime(message.Added, message.Retries); + message.StatusName = StatusName.Failed; + } transaction.UpdateMessage(message); - } + if (message.Retries >= _options.FailedRetryCount) + { + _logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " + + "MessageId:" + message.Id); + } + } await transaction.CommitAsync(); } @@ -126,12 +137,44 @@ namespace DotNetCore.CAP.Processor } } - await _subscriberExecutor.ExecuteAsync(message); + using (var transaction = connection.CreateTransaction()) + { + var result = await _subscriberExecutor.ExecuteAsync(message); + if (result.Succeeded) + { + _stateChanger.ChangeState(message, new SucceededState(), transaction); + _logger.LogInformation("The message was execute successfully during the retry. MessageId:" + message.Id); + } + else + { + message.Content = Helper.AddExceptionProperty(message.Content, result.Exception); + message.Retries++; + if (message.StatusName == StatusName.Scheduled) + { + message.ExpiresAt = GetDueTime(message.Added, message.Retries); + message.StatusName = StatusName.Failed; + } + transaction.UpdateMessage(message); + + if (message.Retries >= _options.FailedRetryCount) + { + _logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " + + "We will stop retrying to execute the message. message id:" + message.Id); + } + } + await transaction.CommitAsync(); + } context.ThrowIfStopping(); await context.WaitAsync(_delay); } } + + public DateTime GetDueTime(DateTime addedTime, int retries) + { + var retryBehavior = RetryBehavior.DefaultRetry; + return addedTime.AddSeconds(retryBehavior.RetryIn(retries)); + } } } \ No newline at end of file