From 075c827577e731ef764dcf07ccbd51e70aad8263 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 25 Jul 2019 18:16:31 +0800 Subject: [PATCH] Support multiple consumer threads. (#295) --- src/DotNetCore.CAP/CAP.Options.cs | 36 ++++------- .../IConsumerRegister.Default.cs | 62 +++++++++++-------- 2 files changed, 46 insertions(+), 52 deletions(-) diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 59c9ee4..13a3af4 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -14,34 +14,14 @@ namespace DotNetCore.CAP /// public class CapOptions { - /// - /// Default succeeded message expiration time span, in seconds. - /// - public const int DefaultSucceedMessageExpirationAfter = 24 * 3600; - - /// - /// Failed message retry waiting interval. - /// - public const int DefaultFailedMessageWaitingInterval = 60; - - /// - /// Failed message retry count. - /// - public const int DefaultFailedRetryCount = 50; - - /// - /// Default version - /// - public const string DefaultVersion = "v1"; - - public CapOptions() { - SucceedMessageExpiredAfter = DefaultSucceedMessageExpirationAfter; - FailedRetryInterval = DefaultFailedMessageWaitingInterval; - FailedRetryCount = DefaultFailedRetryCount; + SucceedMessageExpiredAfter = 24 * 3600; + FailedRetryInterval = 60; + FailedRetryCount = 50; + ConsumerThreadCount = 1; Extensions = new List(); - Version = DefaultVersion; + Version = "v1"; DefaultGroup = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name.ToLower(); } @@ -80,6 +60,12 @@ namespace DotNetCore.CAP /// public int FailedRetryCount { get; set; } + /// + /// The number of consumer thread connections. + /// Default is 1 + /// + public int ConsumerThreadCount { get; set; } + /// /// Registers an extension that will be executed when building services. /// diff --git a/src/DotNetCore.CAP/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/IConsumerRegister.Default.cs index 53b5320..22bf989 100644 --- a/src/DotNetCore.CAP/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/IConsumerRegister.Default.cs @@ -11,6 +11,7 @@ using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Models; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace DotNetCore.CAP { @@ -21,9 +22,10 @@ namespace DotNetCore.CAP private readonly IDispatcher _dispatcher; private readonly ILogger _logger; private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); + private readonly CapOptions _options; private readonly MethodMatcherCache _selector; + private readonly CancellationTokenSource _cts; - private CancellationTokenSource _cts; private string _serverAddress; private Task _compositeTask; private bool _disposed; @@ -34,17 +36,21 @@ namespace DotNetCore.CAP private static readonly DiagnosticListener s_diagnosticListener = new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); - public ConsumerRegister(IConsumerClientFactory consumerClientFactory, + public ConsumerRegister( + IOptions options, + IConsumerClientFactory consumerClientFactory, IDispatcher dispatcher, IStorageConnection connection, ILogger logger, MethodMatcherCache selector) { + _options = options.Value; _selector = selector; _logger = logger; _consumerClientFactory = consumerClientFactory; _dispatcher = dispatcher; _connection = connection; + _cts = new CancellationTokenSource(); } public bool IsHealthy() @@ -54,43 +60,43 @@ namespace DotNetCore.CAP public void Start() { - _cts = new CancellationTokenSource(); - var groupingMatches = _selector.GetCandidatesMethodsOfGroupNameGrouped(); foreach (var matchGroup in groupingMatches) { - Task.Factory.StartNew(() => + for (int i = 0; i < _options.ConsumerThreadCount; i++) { - try + Task.Factory.StartNew(() => { - using (var client = _consumerClientFactory.Create(matchGroup.Key)) + try { - _serverAddress = client.ServersAddress; + using (var client = _consumerClientFactory.Create(matchGroup.Key)) + { + _serverAddress = client.ServersAddress; - RegisterMessageProcessor(client); + RegisterMessageProcessor(client); - client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name)); + client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name)); - client.Listening(_pollingDelay, _cts.Token); + client.Listening(_pollingDelay, _cts.Token); + } } - } - catch (OperationCanceledException) - { - //ignore - } - catch (BrokerConnectionException e) - { - _isHealthy = false; - _logger.LogError(e, e.Message); - } - catch (Exception e) - { - _logger.LogError(e, e.Message); - } - }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); + catch (OperationCanceledException) + { + //ignore + } + catch (BrokerConnectionException e) + { + _isHealthy = false; + _logger.LogError(e, e.Message); + } + catch (Exception e) + { + _logger.LogError(e, e.Message); + } + }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); + } } - _compositeTask = Task.CompletedTask; } @@ -140,6 +146,8 @@ namespace DotNetCore.CAP { client.OnMessageReceived += (sender, messageContext) => { + _cts.Token.ThrowIfCancellationRequested(); + var startTime = DateTimeOffset.UtcNow; var stopwatch = Stopwatch.StartNew();