@@ -27,8 +27,6 @@ namespace DotNetCore.CAP | |||
throw new ArgumentNullException(nameof(app)); | |||
} | |||
CheckRequirement(app); | |||
var provider = app.ApplicationServices; | |||
var options = provider.GetService<DashboardOptions>(); | |||
@@ -137,30 +135,6 @@ namespace DotNetCore.CAP | |||
} | |||
return true; | |||
} | |||
private static void CheckRequirement(IApplicationBuilder app) | |||
{ | |||
var marker = app.ApplicationServices.GetService<CapMarkerService>(); | |||
if (marker == null) | |||
{ | |||
throw new InvalidOperationException( | |||
"AddCap() must be called on the service collection. eg: services.AddCap(...)"); | |||
} | |||
var messageQueueMarker = app.ApplicationServices.GetService<CapMessageQueueMakerService>(); | |||
if (messageQueueMarker == null) | |||
{ | |||
throw new InvalidOperationException( | |||
"You must be config used message queue provider at AddCap() options! eg: services.AddCap(options=>{ options.UseKafka(...) })"); | |||
} | |||
var databaseMarker = app.ApplicationServices.GetService<CapStorageMarkerService>(); | |||
if (databaseMarker == null) | |||
{ | |||
throw new InvalidOperationException( | |||
"You must be config used database provider at AddCap() options! eg: services.AddCap(options=>{ options.UseSqlServer(...) })"); | |||
} | |||
} | |||
} | |||
} | |||
} |
@@ -43,7 +43,7 @@ namespace Microsoft.Extensions.DependencyInjection | |||
//Processors | |||
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, IDispatcher>(sp => sp.GetRequiredService<IDispatcher>())); | |||
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, ConsumerRegister>()); | |||
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, IConsumerRegister>(sp => sp.GetRequiredService<IConsumerRegister>())); | |||
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, CapProcessingServer>()); | |||
//Queue's message processor | |||
@@ -6,7 +6,7 @@ using System.Collections.Generic; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Persistence; | |||
using DotNetCore.CAP.Transport; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Hosting; | |||
using Microsoft.Extensions.Logging; | |||
@@ -17,30 +17,28 @@ namespace DotNetCore.CAP.Internal | |||
/// </summary> | |||
internal class Bootstrapper : BackgroundService, IBootstrapper | |||
{ | |||
private readonly IServiceProvider _serviceProvider; | |||
private readonly ILogger<Bootstrapper> _logger; | |||
private IEnumerable<IProcessingServer> _processors; | |||
private CancellationTokenSource _cts = new CancellationTokenSource(); | |||
public Bootstrapper( | |||
ILogger<Bootstrapper> logger, | |||
IStorageInitializer storage, | |||
IEnumerable<IProcessingServer> processors) | |||
public Bootstrapper(IServiceProvider serviceProvider, ILogger<Bootstrapper> logger) | |||
{ | |||
_serviceProvider = serviceProvider; | |||
_logger = logger; | |||
Storage = storage; | |||
Processors = processors; | |||
} | |||
private IStorageInitializer Storage { get; } | |||
private IEnumerable<IProcessingServer> Processors { get; } | |||
public async Task BootstrapAsync() | |||
{ | |||
_logger.LogDebug("### CAP background task is starting."); | |||
CheckRequirement(); | |||
try | |||
{ | |||
await Storage.InitializeAsync(_cts.Token); | |||
_processors = _serviceProvider.GetServices<IProcessingServer>(); | |||
await _serviceProvider.GetRequiredService<IStorageInitializer>().InitializeAsync(_cts.Token); | |||
} | |||
catch (Exception e) | |||
{ | |||
@@ -50,8 +48,9 @@ namespace DotNetCore.CAP.Internal | |||
_cts.Token.Register(() => | |||
{ | |||
_logger.LogDebug("### CAP background task is stopping."); | |||
foreach (var item in Processors) | |||
foreach (var item in _processors) | |||
{ | |||
try | |||
{ | |||
@@ -71,7 +70,7 @@ namespace DotNetCore.CAP.Internal | |||
protected virtual Task BootstrapCoreAsync() | |||
{ | |||
foreach (var item in Processors) | |||
foreach (var item in _processors) | |||
{ | |||
_cts.Token.ThrowIfCancellationRequested(); | |||
@@ -107,8 +106,38 @@ namespace DotNetCore.CAP.Internal | |||
public override async Task StopAsync(CancellationToken cancellationToken) | |||
{ | |||
_cts?.Cancel(); | |||
await base.StopAsync(cancellationToken); | |||
} | |||
private void CheckRequirement() | |||
{ | |||
var marker = _serviceProvider.GetService<CapMarkerService>(); | |||
if (marker == null) | |||
{ | |||
throw new InvalidOperationException( | |||
"AddCap() must be added on the service collection. eg: services.AddCap(...)"); | |||
} | |||
var messageQueueMarker = _serviceProvider.GetService<CapMessageQueueMakerService>(); | |||
if (messageQueueMarker == null) | |||
{ | |||
throw new InvalidOperationException( | |||
$"You must be config transport provider for CAP!" + Environment.NewLine + | |||
$"==================================================================================" + Environment.NewLine + | |||
$"======== eg: services.AddCap( options => {{ options.UseRabbitMQ(...) }}); ========" + Environment.NewLine + | |||
$"=================================================================================="); | |||
} | |||
var databaseMarker = _serviceProvider.GetService<CapStorageMarkerService>(); | |||
if (databaseMarker == null) | |||
{ | |||
throw new InvalidOperationException( | |||
$"You must be config storage provider for CAP!" + Environment.NewLine + | |||
$"===================================================================================" + Environment.NewLine + | |||
$"======== eg: services.AddCap( options => {{ options.UseSqlServer(...) }}); ========" + Environment.NewLine + | |||
$"==================================================================================="); | |||
} | |||
} | |||
} | |||
} |
@@ -22,15 +22,15 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
private readonly ILogger _logger; | |||
private readonly IServiceProvider _serviceProvider; | |||
private readonly IConsumerClientFactory _consumerClientFactory; | |||
private readonly IDispatcher _dispatcher; | |||
private readonly ISerializer _serializer; | |||
private readonly IDataStorage _storage; | |||
private readonly MethodMatcherCache _selector; | |||
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); | |||
private readonly CapOptions _options; | |||
private IConsumerClientFactory _consumerClientFactory; | |||
private IDispatcher _dispatcher; | |||
private ISerializer _serializer; | |||
private IDataStorage _storage; | |||
private MethodMatcherCache _selector; | |||
private CancellationTokenSource _cts; | |||
private BrokerAddress _serverAddress; | |||
private Task _compositeTask; | |||
@@ -41,18 +41,12 @@ namespace DotNetCore.CAP.Internal | |||
// ReSharper disable once InconsistentNaming | |||
private static readonly DiagnosticListener s_diagnosticListener = | |||
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName); | |||
public ConsumerRegister(ILogger<ConsumerRegister> logger, IServiceProvider serviceProvider) | |||
{ | |||
_logger = logger; | |||
_serviceProvider = serviceProvider; | |||
_options = serviceProvider.GetService<IOptions<CapOptions>>().Value; | |||
_selector = serviceProvider.GetService<MethodMatcherCache>(); | |||
_consumerClientFactory = serviceProvider.GetService<IConsumerClientFactory>(); | |||
_dispatcher = serviceProvider.GetService<IDispatcher>(); | |||
_serializer = serviceProvider.GetService<ISerializer>(); | |||
_storage = serviceProvider.GetService<IDataStorage>(); | |||
_cts = new CancellationTokenSource(); | |||
} | |||
@@ -63,6 +57,12 @@ namespace DotNetCore.CAP.Internal | |||
public void Start(CancellationToken stoppingToken) | |||
{ | |||
_selector = _serviceProvider.GetService<MethodMatcherCache>(); | |||
_dispatcher = _serviceProvider.GetService<IDispatcher>(); | |||
_serializer = _serviceProvider.GetService<ISerializer>(); | |||
_storage = _serviceProvider.GetService<IDataStorage>(); | |||
_consumerClientFactory = _serviceProvider.GetService<IConsumerClientFactory>(); | |||
stoppingToken.Register(() => _cts?.Cancel()); | |||
Execute(); | |||
@@ -5,38 +5,34 @@ using System; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Persistence; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.DependencyInjection; | |||
namespace DotNetCore.CAP.Processor | |||
{ | |||
public class CollectorProcessor : IProcessor | |||
{ | |||
private readonly ILogger _logger; | |||
private readonly IStorageInitializer _initializer; | |||
private readonly IDataStorage _storage; | |||
private readonly IServiceProvider _serviceProvider; | |||
private const int ItemBatch = 1000; | |||
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); | |||
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); | |||
public CollectorProcessor( | |||
ILogger<CollectorProcessor> logger, | |||
IStorageInitializer initializer, | |||
IDataStorage storage) | |||
private readonly string[] _tableNames; | |||
public CollectorProcessor(ILogger<CollectorProcessor> logger, IServiceProvider serviceProvider) | |||
{ | |||
_logger = logger; | |||
_initializer = initializer; | |||
_storage = storage; | |||
} | |||
_serviceProvider = serviceProvider; | |||
var initializer = _serviceProvider.GetService<IStorageInitializer>(); | |||
_tableNames = new[] { initializer.GetPublishedTableName(), initializer.GetReceivedTableName() }; | |||
} | |||
public async Task ProcessAsync(ProcessingContext context) | |||
{ | |||
var tables = new[] | |||
{ | |||
_initializer.GetPublishedTableName(), | |||
_initializer.GetReceivedTableName() | |||
}; | |||
foreach (var table in tables) | |||
foreach (var table in _tableNames) | |||
{ | |||
_logger.LogDebug($"Collecting expired data from table: {table}"); | |||
@@ -44,7 +40,8 @@ namespace DotNetCore.CAP.Processor | |||
var time = DateTime.Now; | |||
do | |||
{ | |||
deletedCount = await _storage.DeleteExpiresAsync(table, time, ItemBatch, context.CancellationToken); | |||
deletedCount = await _serviceProvider.GetService<IDataStorage>() | |||
.DeleteExpiresAsync(table, time, ItemBatch, context.CancellationToken); | |||
if (deletedCount != 0) | |||
{ | |||