diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs
index 59c9ee4..13a3af4 100644
--- a/src/DotNetCore.CAP/CAP.Options.cs
+++ b/src/DotNetCore.CAP/CAP.Options.cs
@@ -14,34 +14,14 @@ namespace DotNetCore.CAP
///
public class CapOptions
{
- ///
- /// Default succeeded message expiration time span, in seconds.
- ///
- public const int DefaultSucceedMessageExpirationAfter = 24 * 3600;
-
- ///
- /// Failed message retry waiting interval.
- ///
- public const int DefaultFailedMessageWaitingInterval = 60;
-
- ///
- /// Failed message retry count.
- ///
- public const int DefaultFailedRetryCount = 50;
-
- ///
- /// Default version
- ///
- public const string DefaultVersion = "v1";
-
-
public CapOptions()
{
- SucceedMessageExpiredAfter = DefaultSucceedMessageExpirationAfter;
- FailedRetryInterval = DefaultFailedMessageWaitingInterval;
- FailedRetryCount = DefaultFailedRetryCount;
+ SucceedMessageExpiredAfter = 24 * 3600;
+ FailedRetryInterval = 60;
+ FailedRetryCount = 50;
+ ConsumerThreadCount = 1;
Extensions = new List();
- Version = DefaultVersion;
+ Version = "v1";
DefaultGroup = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name.ToLower();
}
@@ -80,6 +60,12 @@ namespace DotNetCore.CAP
///
public int FailedRetryCount { get; set; }
+ ///
+ /// The number of consumer thread connections.
+ /// Default is 1
+ ///
+ public int ConsumerThreadCount { get; set; }
+
///
/// Registers an extension that will be executed when building services.
///
diff --git a/src/DotNetCore.CAP/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/IConsumerRegister.Default.cs
index 53b5320..22bf989 100644
--- a/src/DotNetCore.CAP/IConsumerRegister.Default.cs
+++ b/src/DotNetCore.CAP/IConsumerRegister.Default.cs
@@ -11,6 +11,7 @@ using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP
{
@@ -21,9 +22,10 @@ namespace DotNetCore.CAP
private readonly IDispatcher _dispatcher;
private readonly ILogger _logger;
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1);
+ private readonly CapOptions _options;
private readonly MethodMatcherCache _selector;
+ private readonly CancellationTokenSource _cts;
- private CancellationTokenSource _cts;
private string _serverAddress;
private Task _compositeTask;
private bool _disposed;
@@ -34,17 +36,21 @@ namespace DotNetCore.CAP
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
- public ConsumerRegister(IConsumerClientFactory consumerClientFactory,
+ public ConsumerRegister(
+ IOptions options,
+ IConsumerClientFactory consumerClientFactory,
IDispatcher dispatcher,
IStorageConnection connection,
ILogger logger,
MethodMatcherCache selector)
{
+ _options = options.Value;
_selector = selector;
_logger = logger;
_consumerClientFactory = consumerClientFactory;
_dispatcher = dispatcher;
_connection = connection;
+ _cts = new CancellationTokenSource();
}
public bool IsHealthy()
@@ -54,43 +60,43 @@ namespace DotNetCore.CAP
public void Start()
{
- _cts = new CancellationTokenSource();
-
var groupingMatches = _selector.GetCandidatesMethodsOfGroupNameGrouped();
foreach (var matchGroup in groupingMatches)
{
- Task.Factory.StartNew(() =>
+ for (int i = 0; i < _options.ConsumerThreadCount; i++)
{
- try
+ Task.Factory.StartNew(() =>
{
- using (var client = _consumerClientFactory.Create(matchGroup.Key))
+ try
{
- _serverAddress = client.ServersAddress;
+ using (var client = _consumerClientFactory.Create(matchGroup.Key))
+ {
+ _serverAddress = client.ServersAddress;
- RegisterMessageProcessor(client);
+ RegisterMessageProcessor(client);
- client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));
+ client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));
- client.Listening(_pollingDelay, _cts.Token);
+ client.Listening(_pollingDelay, _cts.Token);
+ }
}
- }
- catch (OperationCanceledException)
- {
- //ignore
- }
- catch (BrokerConnectionException e)
- {
- _isHealthy = false;
- _logger.LogError(e, e.Message);
- }
- catch (Exception e)
- {
- _logger.LogError(e, e.Message);
- }
- }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
+ catch (OperationCanceledException)
+ {
+ //ignore
+ }
+ catch (BrokerConnectionException e)
+ {
+ _isHealthy = false;
+ _logger.LogError(e, e.Message);
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, e.Message);
+ }
+ }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
+ }
}
-
_compositeTask = Task.CompletedTask;
}
@@ -140,6 +146,8 @@ namespace DotNetCore.CAP
{
client.OnMessageReceived += (sender, messageContext) =>
{
+ _cts.Token.ThrowIfCancellationRequested();
+
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();