diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 346a06d..b6768c6 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -26,6 +26,7 @@ namespace DotNetCore.CAP Extensions = new List(); Version = "v1"; DefaultGroupName = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name.ToLower(); + CollectorCleaningInterval = 300; } internal IList Extensions { get; } @@ -85,6 +86,12 @@ namespace DotNetCore.CAP /// public int ProducerThreadCount { get; set; } + /// + /// The interval of the collector processor deletes expired messages. + /// Default is 300 seconds. + /// + public int CollectorCleaningInterval { get; set; } + /// /// Registers an extension that will be executed when building services. /// diff --git a/src/DotNetCore.CAP/Processor/IProcessor.Collector.cs b/src/DotNetCore.CAP/Processor/IProcessor.Collector.cs index 1a4a800..d1673f1 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.Collector.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.Collector.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using DotNetCore.CAP.Persistence; using Microsoft.Extensions.Logging; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; namespace DotNetCore.CAP.Processor { @@ -15,15 +16,16 @@ namespace DotNetCore.CAP.Processor private readonly IServiceProvider _serviceProvider; private const int ItemBatch = 1000; - private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); + private readonly TimeSpan _waitingInterval; private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); private readonly string[] _tableNames; - public CollectorProcessor(ILogger logger, IServiceProvider serviceProvider) + public CollectorProcessor(ILogger logger, IOptions options, IServiceProvider serviceProvider) { _logger = logger; _serviceProvider = serviceProvider; + _waitingInterval = TimeSpan.FromSeconds(options.Value.CollectorCleaningInterval); var initializer = _serviceProvider.GetService();