|
@@ -80,18 +80,29 @@ namespace DotNetCore.CAP.Processor |
|
|
|
|
|
|
|
|
using (var transaction = connection.CreateTransaction()) |
|
|
using (var transaction = connection.CreateTransaction()) |
|
|
{ |
|
|
{ |
|
|
try |
|
|
|
|
|
|
|
|
var result = await _publishExecutor.PublishAsync(message.Name, message.Content); |
|
|
|
|
|
if (result.Succeeded) |
|
|
{ |
|
|
{ |
|
|
await _publishExecutor.PublishAsync(message.Name, message.Content); |
|
|
|
|
|
|
|
|
|
|
|
_stateChanger.ChangeState(message, new SucceededState(), transaction); |
|
|
_stateChanger.ChangeState(message, new SucceededState(), transaction); |
|
|
|
|
|
_logger.LogInformation("The message was sent successfully during the retry. MessageId:" + message.Id); |
|
|
} |
|
|
} |
|
|
catch (Exception e) |
|
|
|
|
|
|
|
|
else |
|
|
{ |
|
|
{ |
|
|
message.Content = Helper.AddExceptionProperty(message.Content, e); |
|
|
|
|
|
|
|
|
message.Content = Helper.AddExceptionProperty(message.Content, result.Exception); |
|
|
|
|
|
message.Retries++; |
|
|
|
|
|
if (message.StatusName == StatusName.Scheduled) |
|
|
|
|
|
{ |
|
|
|
|
|
message.ExpiresAt = GetDueTime(message.Added, message.Retries); |
|
|
|
|
|
message.StatusName = StatusName.Failed; |
|
|
|
|
|
} |
|
|
transaction.UpdateMessage(message); |
|
|
transaction.UpdateMessage(message); |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (message.Retries >= _options.FailedRetryCount) |
|
|
|
|
|
{ |
|
|
|
|
|
_logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " + |
|
|
|
|
|
"MessageId:" + message.Id); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
await transaction.CommitAsync(); |
|
|
await transaction.CommitAsync(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@@ -126,12 +137,44 @@ namespace DotNetCore.CAP.Processor |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
await _subscriberExecutor.ExecuteAsync(message); |
|
|
|
|
|
|
|
|
using (var transaction = connection.CreateTransaction()) |
|
|
|
|
|
{ |
|
|
|
|
|
var result = await _subscriberExecutor.ExecuteAsync(message); |
|
|
|
|
|
if (result.Succeeded) |
|
|
|
|
|
{ |
|
|
|
|
|
_stateChanger.ChangeState(message, new SucceededState(), transaction); |
|
|
|
|
|
_logger.LogInformation("The message was execute successfully during the retry. MessageId:" + message.Id); |
|
|
|
|
|
} |
|
|
|
|
|
else |
|
|
|
|
|
{ |
|
|
|
|
|
message.Content = Helper.AddExceptionProperty(message.Content, result.Exception); |
|
|
|
|
|
message.Retries++; |
|
|
|
|
|
if (message.StatusName == StatusName.Scheduled) |
|
|
|
|
|
{ |
|
|
|
|
|
message.ExpiresAt = GetDueTime(message.Added, message.Retries); |
|
|
|
|
|
message.StatusName = StatusName.Failed; |
|
|
|
|
|
} |
|
|
|
|
|
transaction.UpdateMessage(message); |
|
|
|
|
|
|
|
|
|
|
|
if (message.Retries >= _options.FailedRetryCount) |
|
|
|
|
|
{ |
|
|
|
|
|
_logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " + |
|
|
|
|
|
"We will stop retrying to execute the message. message id:" + message.Id); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
await transaction.CommitAsync(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
context.ThrowIfStopping(); |
|
|
context.ThrowIfStopping(); |
|
|
|
|
|
|
|
|
await context.WaitAsync(_delay); |
|
|
await context.WaitAsync(_delay); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public DateTime GetDueTime(DateTime addedTime, int retries) |
|
|
|
|
|
{ |
|
|
|
|
|
var retryBehavior = RetryBehavior.DefaultRetry; |
|
|
|
|
|
return addedTime.AddSeconds(retryBehavior.RetryIn(retries)); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |