diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs index b5d56c3..a324a98 100644 --- a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs +++ b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using System.Threading.Tasks; @@ -55,6 +56,16 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];"; } } + public async Task> GetFailedPublishedMessages() + { + var sql = $"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'"; + + using (var connection = new SqlConnection(_options.ConnectionString)) + { + return await connection.QueryAsync(sql); + } + } + // CapReceviedMessage public async Task StoreReceivedMessageAsync(CapReceivedMessage message) @@ -89,6 +100,15 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; } } + public async Task> GetFailedReceviedMessages() + { + var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'"; + using (var connection = new SqlConnection(_options.ConnectionString)) + { + return await connection.QueryAsync(sql); + } + } + public void Dispose() { } diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 9c4a872..8128441 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -22,9 +22,14 @@ namespace DotNetCore.CAP } /// - /// Productor job polling delay time. Default is 8 sec. + /// Productor job polling delay time. Default is 5 sec. /// - public int PollingDelay { get; set; } = 8; + public int PollingDelay { get; set; } = 5; + + /// + /// Failed messages polling delay time. Default is 2 min. + /// + public TimeSpan FailedMessageWaitingInterval = TimeSpan.FromMinutes(2); /// /// We’ll send a POST request to the URL below with details of any subscribed events. diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs index 7e3908d..60a5fca 100644 --- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs +++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs @@ -47,6 +47,7 @@ namespace Microsoft.Extensions.DependencyInjection //Processors services.AddTransient(); services.AddTransient(); + services.AddTransient(); services.AddTransient(); //Executors diff --git a/src/DotNetCore.CAP/IStorageConnection.cs b/src/DotNetCore.CAP/IStorageConnection.cs index 4c16a5f..e481a56 100644 --- a/src/DotNetCore.CAP/IStorageConnection.cs +++ b/src/DotNetCore.CAP/IStorageConnection.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using DotNetCore.CAP.Models; @@ -27,6 +28,11 @@ namespace DotNetCore.CAP /// Task GetNextPublishedMessageToBeEnqueuedAsync(); + /// + /// Returns executed failed messages. + /// + Task> GetFailedPublishedMessages(); + // Received messages /// @@ -46,6 +52,10 @@ namespace DotNetCore.CAP /// Task GetNextReceviedMessageToBeEnqueuedAsync(); + /// + /// Returns executed failed message. + /// + Task> GetFailedReceviedMessages(); //----------------------------------------- /// diff --git a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs index 5843382..d8328bc 100644 --- a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs +++ b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs @@ -117,6 +117,7 @@ namespace DotNetCore.CAP.Processor returnedProcessors.Add(_provider.GetRequiredService()); returnedProcessors.Add(_provider.GetRequiredService()); + returnedProcessors.Add(_provider.GetRequiredService()); returnedProcessors.Add(_provider.GetRequiredService()); diff --git a/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs b/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs new file mode 100644 index 0000000..9d5f12d --- /dev/null +++ b/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using DotNetCore.CAP.Processor.States; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.Processor +{ + public class FailedJobProcessor : IProcessor + { + private readonly CapOptions _options; + private readonly ILogger _logger; + private readonly IServiceProvider _provider; + private readonly IStateChanger _stateChanger; + + private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); + private readonly TimeSpan _waitingInterval; + + public FailedJobProcessor( + IOptions options, + ILogger logger, + IServiceProvider provider, + IStateChanger stateChanger) + { + _options = options.Value; + _logger = logger; + _provider = provider; + _stateChanger = stateChanger; + _waitingInterval = _options.FailedMessageWaitingInterval; + } + + public async Task ProcessAsync(ProcessingContext context) + { + if (context == null) + throw new ArgumentNullException(nameof(context)); + + using (var scope = _provider.CreateScope()) + { + var provider = scope.ServiceProvider; + var connection = provider.GetRequiredService(); + + await Task.WhenAll( + ProcessPublishedAsync(connection, context), + ProcessReceivededAsync(connection, context)); + + DefaultDispatcher.PulseEvent.Set(); + + await context.WaitAsync(_waitingInterval); + } + } + + private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context) + { + var messages = await connection.GetFailedPublishedMessages(); + foreach (var message in messages) + { + using (var transaction = connection.CreateTransaction()) + { + _stateChanger.ChangeState(message, new EnqueuedState(), transaction); + await transaction.CommitAsync(); + } + context.ThrowIfStopping(); + await context.WaitAsync(_delay); + } + } + + private async Task ProcessReceivededAsync(IStorageConnection connection, ProcessingContext context) + { + var messages = await connection.GetFailedReceviedMessages(); + foreach (var message in messages) + { + using (var transaction = connection.CreateTransaction()) + { + _stateChanger.ChangeState(message, new EnqueuedState(), transaction); + await transaction.CommitAsync(); + } + context.ThrowIfStopping(); + await context.WaitAsync(_delay); + } + } + } +}