diff --git a/src/DotNetCore.CAP/IPublishExecutor.cs b/src/DotNetCore.CAP/IPublishExecutor.cs new file mode 100644 index 0000000..8a07b55 --- /dev/null +++ b/src/DotNetCore.CAP/IPublishExecutor.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace DotNetCore.CAP +{ + public interface IPublishExecutor + { + Task PublishAsync(string keyName, string content); + } +} diff --git a/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs b/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs index 4fefe1d..b78e80c 100644 --- a/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs +++ b/src/DotNetCore.CAP/IQueueExecutor.Publish.Base.cs @@ -1,6 +1,7 @@ using System; using System.Diagnostics; using System.Threading.Tasks; +using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Models; using DotNetCore.CAP.Processor; using DotNetCore.CAP.Processor.States; @@ -8,7 +9,7 @@ using Microsoft.Extensions.Logging; namespace DotNetCore.CAP { - public abstract class BasePublishQueueExecutor : IQueueExecutor + public abstract class BasePublishQueueExecutor : IQueueExecutor, IPublishExecutor { private readonly ILogger _logger; private readonly CapOptions _options; @@ -40,7 +41,7 @@ namespace DotNetCore.CAP IState newState; if (!result.Succeeded) { - var shouldRetry = await UpdateMessageForRetryAsync(message, connection); + var shouldRetry = UpdateMessageForRetryAsync(message); if (shouldRetry) { newState = new ScheduledState(); @@ -51,6 +52,7 @@ namespace DotNetCore.CAP newState = new FailedState(); _logger.JobFailed(result.Exception); } + message.Content = Helper.AddExceptionProperty(message.Content, result.Exception); } else { @@ -67,6 +69,7 @@ namespace DotNetCore.CAP } catch (Exception ex) { + fetched.Requeue(); _logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex); return OperateResult.Failed(ex); } @@ -74,8 +77,7 @@ namespace DotNetCore.CAP public abstract Task PublishAsync(string keyName, string content); - private static async Task UpdateMessageForRetryAsync(CapPublishedMessage message, - IStorageConnection connection) + private static bool UpdateMessageForRetryAsync(CapPublishedMessage message) { var retryBehavior = RetryBehavior.DefaultRetry; @@ -85,11 +87,7 @@ namespace DotNetCore.CAP var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); message.ExpiresAt = due; - using (var transaction = connection.CreateTransaction()) - { - transaction.UpdateMessage(message); - await transaction.CommitAsync(); - } + return true; } } diff --git a/src/DotNetCore.CAP/IQueueExecutor.Subscribe.cs b/src/DotNetCore.CAP/IQueueExecutor.Subscribe.cs index ea748c3..f32a973 100644 --- a/src/DotNetCore.CAP/IQueueExecutor.Subscribe.cs +++ b/src/DotNetCore.CAP/IQueueExecutor.Subscribe.cs @@ -1,8 +1,6 @@ using System; -using System.Collections.Generic; using System.Diagnostics; using System.Threading.Tasks; -using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Models; @@ -12,151 +10,114 @@ using Microsoft.Extensions.Logging; namespace DotNetCore.CAP { - public class SubscribeQueueExecutor : IQueueExecutor - { - private readonly IConsumerInvokerFactory _consumerInvokerFactory; - private readonly ILogger _logger; - private readonly CapOptions _options; - private readonly MethodMatcherCache _selector; - private readonly IStateChanger _stateChanger; - - public SubscribeQueueExecutor( - IStateChanger stateChanger, - MethodMatcherCache selector, - CapOptions options, - IConsumerInvokerFactory consumerInvokerFactory, - ILogger logger) - { - _selector = selector; - _options = options; - _consumerInvokerFactory = consumerInvokerFactory; - _stateChanger = stateChanger; - _logger = logger; - } - - public async Task ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched) - { - //return await Task.FromResult(OperateResult.Success); - var message = await connection.GetReceivedMessageAsync(fetched.MessageId); - try - { - var sp = Stopwatch.StartNew(); - await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection); - - if (message.Retries > 0) - _logger.JobRetrying(message.Retries); - var result = await ExecuteSubscribeAsync(message); - sp.Stop(); - - IState newState; - if (!result.Succeeded) - { - var shouldRetry = await UpdateMessageForRetryAsync(message, connection, result.Exception?.Message); - if (shouldRetry) - { - newState = new ScheduledState(); - _logger.JobFailedWillRetry(result.Exception); - } - else - { - newState = new FailedState(); - _logger.JobFailed(result.Exception); - } - } - else - { - newState = new SucceededState(_options.SucceedMessageExpiredAfter); - } - await _stateChanger.ChangeStateAsync(message, newState, connection); - - fetched.RemoveFromQueue(); - - if (result.Succeeded) - _logger.JobExecuted(sp.Elapsed.TotalSeconds); - - return OperateResult.Success; - } - catch (SubscriberNotFoundException ex) - { - _logger.LogError(ex.Message); - - await AddErrorReasonToContent(message, ex.Message, connection); - - await _stateChanger.ChangeStateAsync(message, new FailedState(), connection); - - fetched.RemoveFromQueue(); - - return OperateResult.Failed(ex); - } - catch (Exception ex) - { - _logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex); - - fetched.Requeue(); - - return OperateResult.Failed(ex); - } - } - - protected virtual async Task ExecuteSubscribeAsync(CapReceivedMessage receivedMessage) - { - try - { - var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.Name); - - if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group)) - { - var error = $"Topic:{receivedMessage.Name}, can not be found subscriber method."; - throw new SubscriberNotFoundException(error); - } - - // If there are multiple consumers in the same group, we will take the first - var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; - var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext()); - - await _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync(); - - return OperateResult.Success; - } - catch (Exception ex) - { - _logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.Name}", - ex); - - return OperateResult.Failed(ex); - } - } - - private static async Task UpdateMessageForRetryAsync(CapReceivedMessage message, IStorageConnection connection, string exceptionMessage) - { - var retryBehavior = RetryBehavior.DefaultRetry; - - var retries = ++message.Retries; - if (retries >= retryBehavior.RetryCount) - return false; - - var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); - message.ExpiresAt = due; - - await AddErrorReasonToContent(message, exceptionMessage, connection); - - return true; - } - - public static Task AddErrorReasonToContent(CapReceivedMessage message, string description, IStorageConnection connection) - { - var exceptions = new List> - { - new KeyValuePair("ExceptionMessage", description) - }; - - message.Content = Helper.AddJsonProperty(message.Content, exceptions); - using (var transaction = connection.CreateTransaction()) - { - transaction.UpdateMessage(message); - transaction.CommitAsync(); - } - return Task.CompletedTask; - } - } + public class SubscribeQueueExecutor : IQueueExecutor + { + private readonly ILogger _logger; + private readonly CapOptions _options; + private readonly IStateChanger _stateChanger; + private readonly ISubscriberExecutor _subscriberExecutor; + + public SubscribeQueueExecutor( + CapOptions options, + IStateChanger stateChanger, + ISubscriberExecutor subscriberExecutor, + ILogger logger) + { + _options = options; + _subscriberExecutor = subscriberExecutor; + _stateChanger = stateChanger; + _logger = logger; + } + + public async Task ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched) + { + var message = await connection.GetReceivedMessageAsync(fetched.MessageId); + try + { + var sp = Stopwatch.StartNew(); + await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection); + + if (message.Retries > 0) + _logger.JobRetrying(message.Retries); + + var result = await _subscriberExecutor.ExecuteAsync(message); + sp.Stop(); + + var state = GetNewState(result, message); + + await _stateChanger.ChangeStateAsync(message, state, connection); + + fetched.RemoveFromQueue(); + + if (result.Succeeded) + _logger.JobExecuted(sp.Elapsed.TotalSeconds); + + return OperateResult.Success; + } + catch (SubscriberNotFoundException ex) + { + _logger.LogError(ex.Message); + + AddErrorReasonToContent(message, ex); + + await _stateChanger.ChangeStateAsync(message, new FailedState(), connection); + + fetched.RemoveFromQueue(); + + return OperateResult.Failed(ex); + } + catch (Exception ex) + { + _logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex); + + fetched.Requeue(); + + return OperateResult.Failed(ex); + } + } + + private IState GetNewState(OperateResult result, CapReceivedMessage message) + { + IState newState; + if (!result.Succeeded) + { + var shouldRetry = UpdateMessageForRetryAsync(message); + if (shouldRetry) + { + newState = new ScheduledState(); + _logger.JobFailedWillRetry(result.Exception); + } + else + { + newState = new FailedState(); + _logger.JobFailed(result.Exception); + } + AddErrorReasonToContent(message, result.Exception); + } + else + { + newState = new SucceededState(_options.SucceedMessageExpiredAfter); + } + return newState; + } + + private static bool UpdateMessageForRetryAsync(CapReceivedMessage message) + { + var retryBehavior = RetryBehavior.DefaultRetry; + + var retries = ++message.Retries; + if (retries >= retryBehavior.RetryCount) + return false; + + var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries)); + message.ExpiresAt = due; + + return true; + } + + private static void AddErrorReasonToContent(CapReceivedMessage message, Exception exception) + { + message.Content = Helper.AddExceptionProperty(message.Content, exception); + } + } } \ No newline at end of file