|
|
@@ -14,23 +14,19 @@ namespace DotNetCore.CAP |
|
|
|
public class SubscibeQueueExecutor : IQueueExecutor |
|
|
|
{ |
|
|
|
private readonly IConsumerInvokerFactory _consumerInvokerFactory; |
|
|
|
private readonly IConsumerClientFactory _consumerClientFactory; |
|
|
|
private readonly IStateChanger _stateChanger; |
|
|
|
private readonly ILogger _logger; |
|
|
|
|
|
|
|
private readonly MethodMatcherCache _selector; |
|
|
|
//private readonly CapOptions _options; |
|
|
|
|
|
|
|
public SubscibeQueueExecutor( |
|
|
|
IStateChanger stateChanger, |
|
|
|
MethodMatcherCache selector, |
|
|
|
IConsumerInvokerFactory consumerInvokerFactory, |
|
|
|
IConsumerClientFactory consumerClientFactory, |
|
|
|
ILogger<BasePublishQueueExecutor> logger) |
|
|
|
{ |
|
|
|
_selector = selector; |
|
|
|
_consumerInvokerFactory = consumerInvokerFactory; |
|
|
|
_consumerClientFactory = consumerClientFactory; |
|
|
|
_stateChanger = stateChanger; |
|
|
|
_logger = logger; |
|
|
|
} |
|
|
@@ -53,7 +49,7 @@ namespace DotNetCore.CAP |
|
|
|
var newState = default(IState); |
|
|
|
if (!result.Succeeded) |
|
|
|
{ |
|
|
|
var shouldRetry = await UpdateJobForRetryAsync(message, connection); |
|
|
|
var shouldRetry = await UpdateMessageForRetryAsync(message, connection); |
|
|
|
if (shouldRetry) |
|
|
|
{ |
|
|
|
newState = new ScheduledState(); |
|
|
@@ -123,7 +119,7 @@ namespace DotNetCore.CAP |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private async Task<bool> UpdateJobForRetryAsync(CapReceivedMessage message, IStorageConnection connection) |
|
|
|
private async Task<bool> UpdateMessageForRetryAsync(CapReceivedMessage message, IStorageConnection connection) |
|
|
|
{ |
|
|
|
var retryBehavior = RetryBehavior.DefaultRetry; |
|
|
|
|
|
|
|