Browse Source

Support multiple consumer threads. (#295)

master
Savorboard 5 years ago
parent
commit
075c827577
2 changed files with 46 additions and 52 deletions
  1. +11
    -25
      src/DotNetCore.CAP/CAP.Options.cs
  2. +35
    -27
      src/DotNetCore.CAP/IConsumerRegister.Default.cs

+ 11
- 25
src/DotNetCore.CAP/CAP.Options.cs View File

@@ -14,34 +14,14 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public class CapOptions public class CapOptions
{ {
/// <summary>
/// Default succeeded message expiration time span, in seconds.
/// </summary>
public const int DefaultSucceedMessageExpirationAfter = 24 * 3600;

/// <summary>
/// Failed message retry waiting interval.
/// </summary>
public const int DefaultFailedMessageWaitingInterval = 60;

/// <summary>
/// Failed message retry count.
/// </summary>
public const int DefaultFailedRetryCount = 50;

/// <summary>
/// Default version
/// </summary>
public const string DefaultVersion = "v1";


public CapOptions() public CapOptions()
{ {
SucceedMessageExpiredAfter = DefaultSucceedMessageExpirationAfter;
FailedRetryInterval = DefaultFailedMessageWaitingInterval;
FailedRetryCount = DefaultFailedRetryCount;
SucceedMessageExpiredAfter = 24 * 3600;
FailedRetryInterval = 60;
FailedRetryCount = 50;
ConsumerThreadCount = 1;
Extensions = new List<ICapOptionsExtension>(); Extensions = new List<ICapOptionsExtension>();
Version = DefaultVersion;
Version = "v1";
DefaultGroup = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name.ToLower(); DefaultGroup = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name.ToLower();
} }


@@ -80,6 +60,12 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public int FailedRetryCount { get; set; } public int FailedRetryCount { get; set; }


/// <summary>
/// The number of consumer thread connections.
/// Default is 1
/// </summary>
public int ConsumerThreadCount { get; set; }

/// <summary> /// <summary>
/// Registers an extension that will be executed when building services. /// Registers an extension that will be executed when building services.
/// </summary> /// </summary>


+ 35
- 27
src/DotNetCore.CAP/IConsumerRegister.Default.cs View File

@@ -11,6 +11,7 @@ using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;


namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
@@ -21,9 +22,10 @@ namespace DotNetCore.CAP
private readonly IDispatcher _dispatcher; private readonly IDispatcher _dispatcher;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1);
private readonly CapOptions _options;
private readonly MethodMatcherCache _selector; private readonly MethodMatcherCache _selector;
private readonly CancellationTokenSource _cts;


private CancellationTokenSource _cts;
private string _serverAddress; private string _serverAddress;
private Task _compositeTask; private Task _compositeTask;
private bool _disposed; private bool _disposed;
@@ -34,17 +36,21 @@ namespace DotNetCore.CAP
private static readonly DiagnosticListener s_diagnosticListener = private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);


public ConsumerRegister(IConsumerClientFactory consumerClientFactory,
public ConsumerRegister(
IOptions<CapOptions> options,
IConsumerClientFactory consumerClientFactory,
IDispatcher dispatcher, IDispatcher dispatcher,
IStorageConnection connection, IStorageConnection connection,
ILogger<ConsumerRegister> logger, ILogger<ConsumerRegister> logger,
MethodMatcherCache selector) MethodMatcherCache selector)
{ {
_options = options.Value;
_selector = selector; _selector = selector;
_logger = logger; _logger = logger;
_consumerClientFactory = consumerClientFactory; _consumerClientFactory = consumerClientFactory;
_dispatcher = dispatcher; _dispatcher = dispatcher;
_connection = connection; _connection = connection;
_cts = new CancellationTokenSource();
} }


public bool IsHealthy() public bool IsHealthy()
@@ -54,43 +60,43 @@ namespace DotNetCore.CAP


public void Start() public void Start()
{ {
_cts = new CancellationTokenSource();

var groupingMatches = _selector.GetCandidatesMethodsOfGroupNameGrouped(); var groupingMatches = _selector.GetCandidatesMethodsOfGroupNameGrouped();


foreach (var matchGroup in groupingMatches) foreach (var matchGroup in groupingMatches)
{ {
Task.Factory.StartNew(() =>
for (int i = 0; i < _options.ConsumerThreadCount; i++)
{ {
try
Task.Factory.StartNew(() =>
{ {
using (var client = _consumerClientFactory.Create(matchGroup.Key))
try
{ {
_serverAddress = client.ServersAddress;
using (var client = _consumerClientFactory.Create(matchGroup.Key))
{
_serverAddress = client.ServersAddress;


RegisterMessageProcessor(client);
RegisterMessageProcessor(client);


client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));
client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));


client.Listening(_pollingDelay, _cts.Token);
client.Listening(_pollingDelay, _cts.Token);
}
} }
}
catch (OperationCanceledException)
{
//ignore
}
catch (BrokerConnectionException e)
{
_isHealthy = false;
_logger.LogError(e, e.Message);
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
}
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
catch (OperationCanceledException)
{
//ignore
}
catch (BrokerConnectionException e)
{
_isHealthy = false;
_logger.LogError(e, e.Message);
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
}
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
} }

_compositeTask = Task.CompletedTask; _compositeTask = Task.CompletedTask;
} }


@@ -140,6 +146,8 @@ namespace DotNetCore.CAP
{ {
client.OnMessageReceived += (sender, messageContext) => client.OnMessageReceived += (sender, messageContext) =>
{ {
_cts.Token.ThrowIfCancellationRequested();

var startTime = DateTimeOffset.UtcNow; var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew(); var stopwatch = Stopwatch.StartNew();




Loading…
Cancel
Save