Browse Source

Merge pull request #936 from dotnetcore/supports/flow-control

Improve flow control for message of in memory
master
xiangxiren 3 years ago
committed by GitHub
parent
commit
e636e94fd6
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 11 deletions
  1. +9
    -2
      src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
  2. +3
    -2
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  3. +54
    -7
      src/DotNetCore.CAP/Processor/IDispatcher.Default.cs

+ 9
- 2
src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs View File

@@ -81,16 +81,23 @@ namespace DotNetCore.CAP.RabbitMQ

public void Commit(object sender)
{
_channel.BasicAck((ulong)sender, false);
if (_channel.IsOpen)
{
_channel.BasicAck((ulong)sender, false);
}
}

public void Reject(object sender)
{
_channel.BasicReject((ulong)sender, true);
if (_channel.IsOpen)
{
_channel.BasicReject((ulong)sender, true);
}
}

public void Dispose()
{

_channel?.Dispose();
//The connection should not be closed here, because the connection is still in use elsewhere.
//_connection?.Dispose();


+ 3
- 2
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs View File

@@ -165,7 +165,8 @@ namespace DotNetCore.CAP.Internal

private void RegisterMessageProcessor(IConsumerClient client)
{
client.OnMessageReceived += async (sender, transportMessage) =>
// Cannot set subscription to asynchronous
client.OnMessageReceived += (sender, transportMessage) =>
{
_logger.MessageReceived(transportMessage.GetId(), transportMessage.GetName());

@@ -193,7 +194,7 @@ namespace DotNetCore.CAP.Internal
}

var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType;
message = await _serializer.DeserializeAsync(transportMessage, type);
message = _serializer.DeserializeAsync(transportMessage, type).GetAwaiter().GetResult();
message.RemoveException();
}
catch (Exception e)


+ 54
- 7
src/DotNetCore.CAP/Processor/IDispatcher.Default.cs View File

@@ -21,12 +21,11 @@ namespace DotNetCore.CAP.Processor
private readonly CapOptions _options;
private readonly ISubscribeDispatcher _executor;
private readonly ILogger<Dispatcher> _logger;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

private Channel<MediumMessage> _publishedChannel;
private Channel<(MediumMessage, ConsumerExecutorDescriptor)> _receivedChannel;

private readonly CancellationTokenSource _cts = new CancellationTokenSource();

public Dispatcher(ILogger<Dispatcher> logger,
IMessageSender sender,
IOptions<CapOptions> options,
@@ -41,10 +40,26 @@ namespace DotNetCore.CAP.Processor
public void Start(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
stoppingToken.Register(() => _cts.Cancel());
stoppingToken.Register(() => _cts.Cancel());

var capacity = _options.ProducerThreadCount * 500;
_publishedChannel = Channel.CreateBounded<MediumMessage>(new BoundedChannelOptions(capacity > 5000 ? 5000 : capacity)
{
AllowSynchronousContinuations = true,
SingleReader = _options.ProducerThreadCount == 1,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});

capacity = _options.ConsumerThreadCount * 300;
_receivedChannel = Channel.CreateBounded<(MediumMessage, ConsumerExecutorDescriptor)>(new BoundedChannelOptions(capacity > 3000 ? 3000 : capacity)
{
AllowSynchronousContinuations = true,
SingleReader = _options.ConsumerThreadCount == 1,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});

_publishedChannel = Channel.CreateUnbounded<MediumMessage>(new UnboundedChannelOptions() { SingleReader = false, SingleWriter = true });
_receivedChannel = Channel.CreateUnbounded<(MediumMessage, ConsumerExecutorDescriptor)>();

Task.WhenAll(Enumerable.Range(0, _options.ProducerThreadCount)
.Select(_ => Task.Factory.StartNew(() => Sending(stoppingToken), stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());
@@ -55,12 +70,44 @@ namespace DotNetCore.CAP.Processor

public void EnqueueToPublish(MediumMessage message)
{
_publishedChannel.Writer.TryWrite(message);
try
{
if (!_publishedChannel.Writer.TryWrite(message))
{
while (_publishedChannel.Writer.WaitToWriteAsync(_cts.Token).AsTask().ConfigureAwait(false).GetAwaiter().GetResult())
{
if (_publishedChannel.Writer.TryWrite(message))
{
return;
}
}
}
}
catch (OperationCanceledException)
{
//Ignore
}
}

public void EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor descriptor)
{
_receivedChannel.Writer.TryWrite((message, descriptor));
try
{
if (!_receivedChannel.Writer.TryWrite((message, descriptor)))
{
while (_receivedChannel.Writer.WaitToWriteAsync(_cts.Token).AsTask().ConfigureAwait(false).GetAwaiter().GetResult())
{
if (_receivedChannel.Writer.TryWrite((message, descriptor)))
{
return;
}
}
}
}
catch (OperationCanceledException)
{
//Ignore
}
}

private async Task Sending(CancellationToken cancellationToken)


Loading…
Cancel
Save