Browse Source

Adjust the MqttClientPendingMessagesQueue and the MqttPacketReader to use asynchronous methods for long-running/blocking operations.

Otherwise, Worker Threads from the .NET Threadpool are blocked while waiting for the next client message, and while waiting for the next message to send.
release/3.x.x
kpreisser 7 years ago
parent
commit
e1a9aa7d03
3 changed files with 25 additions and 16 deletions
  1. +1
    -1
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
  2. +13
    -10
      Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs
  3. +11
    -5
      Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs

+ 1
- 1
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs View File

@@ -120,7 +120,7 @@ namespace MQTTnet.Adapter


private static async Task<ReceivedMqttPacket> ReceiveAsync(Stream stream, CancellationToken cancellationToken) private static async Task<ReceivedMqttPacket> ReceiveAsync(Stream stream, CancellationToken cancellationToken)
{ {
var header = MqttPacketReader.ReadHeaderFromSource(stream, cancellationToken);
var header = await MqttPacketReader.ReadHeaderFromSourceAsync(stream, cancellationToken);
if (header == null) if (header == null)
{ {
return null; return null;


+ 13
- 10
Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs View File

@@ -22,17 +22,18 @@ namespace MQTTnet.Serializer


public bool EndOfRemainingData => BaseStream.Position == _header.BodyLength; public bool EndOfRemainingData => BaseStream.Position == _header.BodyLength;


public static MqttPacketHeader ReadHeaderFromSource(Stream stream, CancellationToken cancellationToken)
public static async Task<MqttPacketHeader> ReadHeaderFromSourceAsync(Stream stream, CancellationToken cancellationToken)
{ {
var buffer = stream.ReadByte();
if (buffer == -1)
byte[] singleByteBuf = new byte[1];
var readCount = await stream.ReadAsync(singleByteBuf, 0, singleByteBuf.Length).ConfigureAwait(false);
if (readCount <= 0)
{ {
return null; return null;
} }
var fixedHeader = (byte)buffer;
var fixedHeader = singleByteBuf[0];
var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4); var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4);
var bodyLength = ReadBodyLengthFromSource(stream, cancellationToken);
var bodyLength = await ReadBodyLengthFromSourceAsync(stream, cancellationToken).ConfigureAwait(false);


return new MqttPacketHeader return new MqttPacketHeader
{ {
@@ -80,13 +81,15 @@ namespace MQTTnet.Serializer
return ReadBytes(_header.BodyLength - (int)BaseStream.Position); return ReadBytes(_header.BodyLength - (int)BaseStream.Position);
} }


private static int ReadBodyLengthFromSource(Stream stream, CancellationToken cancellationToken)
private static async Task<int> ReadBodyLengthFromSourceAsync(Stream stream, CancellationToken cancellationToken)
{ {
// Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var multiplier = 1; var multiplier = 1;
var value = 0; var value = 0;
byte encodedByte; byte encodedByte;


byte[] singleByteBuf = new byte[1];

var readBytes = new List<byte>(); var readBytes = new List<byte>();
do do
{ {
@@ -95,13 +98,13 @@ namespace MQTTnet.Serializer
throw new TaskCanceledException(); throw new TaskCanceledException();
} }


var buffer = stream.ReadByte();
if (buffer == -1)
int readCount = await stream.ReadAsync(singleByteBuf, 0, singleByteBuf.Length).ConfigureAwait(false);
if (readCount <= 0)
{ {
throw new MqttCommunicationException("Connection closed while reading remaining length data."); throw new MqttCommunicationException("Connection closed while reading remaining length data.");
} }


encodedByte = (byte)buffer;
encodedByte = singleByteBuf[0];
readBytes.Add(encodedByte); readBytes.Add(encodedByte);


value += (byte)(encodedByte & 127) * multiplier; value += (byte)(encodedByte & 127) * multiplier;


+ 11
- 5
Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs View File

@@ -12,7 +12,8 @@ namespace MQTTnet.Server
{ {
public sealed class MqttClientPendingMessagesQueue : IDisposable public sealed class MqttClientPendingMessagesQueue : IDisposable
{ {
private readonly BlockingCollection<MqttBasePacket> _queue = new BlockingCollection<MqttBasePacket>();
private readonly ConcurrentQueue<MqttBasePacket> _queue = new ConcurrentQueue<MqttBasePacket>();
private readonly SemaphoreSlim _queueWaitSemaphore = new SemaphoreSlim(0);
private readonly IMqttServerOptions _options; private readonly IMqttServerOptions _options;
private readonly MqttClientSession _session; private readonly MqttClientSession _session;
private readonly IMqttNetLogger _logger; private readonly IMqttNetLogger _logger;
@@ -40,7 +41,8 @@ namespace MQTTnet.Server
{ {
if (packet == null) throw new ArgumentNullException(nameof(packet)); if (packet == null) throw new ArgumentNullException(nameof(packet));


_queue.Add(packet);
_queue.Enqueue(packet);
_queueWaitSemaphore.Release();
_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _session.ClientId); _logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _session.ClientId);
} }


@@ -67,7 +69,10 @@ namespace MQTTnet.Server
MqttBasePacket packet = null; MqttBasePacket packet = null;
try try
{ {
packet = _queue.Take(cancellationToken);
await _queueWaitSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
if (!_queue.TryDequeue(out packet)) {
throw new InvalidOperationException(); // should not happen
}
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false);


_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _session.ClientId); _logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _session.ClientId);
@@ -95,7 +100,8 @@ namespace MQTTnet.Server
if (publishPacket.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) if (publishPacket.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{ {
publishPacket.Dup = true; publishPacket.Dup = true;
_queue.Add(packet, CancellationToken.None);
_queue.Enqueue(packet);
_queueWaitSemaphore.Release();
} }
} }


@@ -105,7 +111,7 @@ namespace MQTTnet.Server


public void Dispose() public void Dispose()
{ {
_queue?.Dispose();
_queueWaitSemaphore?.Dispose();
} }
} }
} }

Loading…
Cancel
Save