diff --git a/src/DotNetCore.CAP/DotNetCore.CAP.csproj b/src/DotNetCore.CAP/DotNetCore.CAP.csproj index 58dd39d..a286b04 100644 --- a/src/DotNetCore.CAP/DotNetCore.CAP.csproj +++ b/src/DotNetCore.CAP/DotNetCore.CAP.csproj @@ -13,7 +13,8 @@ - + + \ No newline at end of file diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs index 74e4014..2139ee1 100644 --- a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs @@ -2,14 +2,16 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using System.Collections.Concurrent; +using System.Linq; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Messages; using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Transport; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace DotNetCore.CAP.Processor { @@ -20,32 +22,35 @@ namespace DotNetCore.CAP.Processor private readonly ISubscribeDispatcher _executor; private readonly ILogger _logger; - private readonly BlockingCollection _publishedMessageQueue = - new BlockingCollection(new ConcurrentQueue()); - - private readonly BlockingCollection<(MediumMessage, ConsumerExecutorDescriptor)> _receivedMessageQueue = - new BlockingCollection<(MediumMessage, ConsumerExecutorDescriptor)>(new ConcurrentQueue<(MediumMessage, ConsumerExecutorDescriptor)>()); + private readonly Channel _publishedChannel; + private readonly Channel<(MediumMessage, ConsumerExecutorDescriptor)> _receivedChannel; public Dispatcher(ILogger logger, IMessageSender sender, + IOptions options, ISubscribeDispatcher executor) { _logger = logger; _sender = sender; _executor = executor; + _publishedChannel = Channel.CreateUnbounded(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = true }); + _receivedChannel = Channel.CreateUnbounded<(MediumMessage, ConsumerExecutorDescriptor)>(); + Task.Factory.StartNew(Sending, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); - Task.Factory.StartNew(Processing, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); + + Task.WhenAll(Enumerable.Range(0, options.Value.ConsumerThreadCount) + .Select(_ => Task.Factory.StartNew(Processing, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray()); } public void EnqueueToPublish(MediumMessage message) { - _publishedMessageQueue.Add(message); + _publishedChannel.Writer.TryWrite(message); } public void EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor descriptor) { - _receivedMessageQueue.Add((message, descriptor)); + _receivedChannel.Writer.TryWrite((message, descriptor)); } public void Dispose() @@ -57,21 +62,23 @@ namespace DotNetCore.CAP.Processor { try { - while (!_publishedMessageQueue.IsCompleted) + while (await _publishedChannel.Reader.WaitToReadAsync(_cts.Token)) { - if (_publishedMessageQueue.TryTake(out var message, 3000, _cts.Token)) + while (_publishedChannel.Reader.TryRead(out var message)) { try { var result = await _sender.SendAsync(message); if (!result.Succeeded) { - _logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception); + _logger.MessagePublishException(message.Origin.GetId(), result.ToString(), + result.Exception); } } catch (Exception ex) { - _logger.LogError(ex, $"An exception occurred when sending a message to the MQ. Id:{message.DbId}"); + _logger.LogError(ex, + $"An exception occurred when sending a message to the MQ. Id:{message.DbId}"); } } } @@ -86,9 +93,12 @@ namespace DotNetCore.CAP.Processor { try { - foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token)) + while (await _receivedChannel.Reader.WaitToReadAsync(_cts.Token)) { - await _executor.DispatchAsync(message.Item1, message.Item2, _cts.Token); + while (_receivedChannel.Reader.TryRead(out var message)) + { + await _executor.DispatchAsync(message.Item1, message.Item2, _cts.Token); + } } } catch (OperationCanceledException)