From 8e107693da24dfc84328cd9c03aa608692291fa9 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Sun, 8 Oct 2017 23:31:10 +0800 Subject: [PATCH] rename file. --- ...ssor.FailedJob.cs => IProcessor.Failed.cs} | 50 ++++++++++++++++--- 1 file changed, 43 insertions(+), 7 deletions(-) rename src/DotNetCore.CAP/Processor/{IProcessor.FailedJob.cs => IProcessor.Failed.cs} (66%) diff --git a/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs b/src/DotNetCore.CAP/Processor/IProcessor.Failed.cs similarity index 66% rename from src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs rename to src/DotNetCore.CAP/Processor/IProcessor.Failed.cs index e778f52..d400536 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.Failed.cs @@ -1,5 +1,8 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Internal; using DotNetCore.CAP.Models; using DotNetCore.CAP.Processor.States; using Microsoft.Extensions.DependencyInjection; @@ -8,26 +11,32 @@ using Microsoft.Extensions.Options; namespace DotNetCore.CAP.Processor { - public class FailedJobProcessor : IProcessor + public class FailedProcessor : IProcessor { private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); private readonly ILogger _logger; private readonly CapOptions _options; private readonly IServiceProvider _provider; private readonly IStateChanger _stateChanger; + private readonly ISubscriberExecutor _subscriberExecutor; + private readonly IPublishExecutor _publishExecutor; private readonly TimeSpan _waitingInterval; - public FailedJobProcessor( + public FailedProcessor( IOptions options, - ILogger logger, + ILogger logger, IServiceProvider provider, - IStateChanger stateChanger) + IStateChanger stateChanger, + ISubscriberExecutor subscriberExecutor, + IPublishExecutor publishExecutor) { _options = options.Value; _logger = logger; _provider = provider; _stateChanger = stateChanger; - _waitingInterval = TimeSpan.FromSeconds(_options.FailedMessageWaitingInterval); + _subscriberExecutor = subscriberExecutor; + _publishExecutor = publishExecutor; + _waitingInterval = TimeSpan.FromSeconds(_options.FailedRetryInterval); } public async Task ProcessAsync(ProcessingContext context) @@ -57,6 +66,9 @@ namespace DotNetCore.CAP.Processor foreach (var message in messages) { + if (message.Retries > _options.FailedRetryCount) + continue; + if (!hasException) try { @@ -70,7 +82,18 @@ namespace DotNetCore.CAP.Processor using (var transaction = connection.CreateTransaction()) { - _stateChanger.ChangeState(message, new EnqueuedState(), transaction); + try + { + await _publishExecutor.PublishAsync(message.Name, message.Content); + + _stateChanger.ChangeState(message, new SucceededState(), transaction); + } + catch (Exception e) + { + message.Content = Helper.AddExceptionProperty(message.Content, e); + message.Retries++; + transaction.UpdateMessage(message); + } await transaction.CommitAsync(); } @@ -87,6 +110,9 @@ namespace DotNetCore.CAP.Processor foreach (var message in messages) { + if (message.Retries > _options.FailedRetryCount) + continue; + if (!hasException) try { @@ -100,7 +126,17 @@ namespace DotNetCore.CAP.Processor using (var transaction = connection.CreateTransaction()) { - _stateChanger.ChangeState(message, new EnqueuedState(), transaction); + var ret = await _subscriberExecutor.ExecuteAsync(message); + if (ret.Succeeded) + { + _stateChanger.ChangeState(message, new SucceededState(), transaction); + } + else + { + message.Retries++; + message.Content = Helper.AddExceptionProperty(message.Content, ret.Exception); + transaction.UpdateMessage(message); + } await transaction.CommitAsync(); }