From e1a9aa7d03911472a76f38e7b1cba199871d92cd Mon Sep 17 00:00:00 2001 From: kpreisser Date: Wed, 17 Jan 2018 11:10:39 +0100 Subject: [PATCH] Adjust the MqttClientPendingMessagesQueue and the MqttPacketReader to use asynchronous methods for long-running/blocking operations. Otherwise, Worker Threads from the .NET Threadpool are blocked while waiting for the next client message, and while waiting for the next message to send. --- .../Adapter/MqttChannelAdapter.cs | 2 +- .../Serializer/MqttPacketReader.cs | 23 +++++++++++-------- .../Server/MqttClientPendingMessagesQueue.cs | 16 +++++++++---- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index 0cbcc93..491655d 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -120,7 +120,7 @@ namespace MQTTnet.Adapter private static async Task ReceiveAsync(Stream stream, CancellationToken cancellationToken) { - var header = MqttPacketReader.ReadHeaderFromSource(stream, cancellationToken); + var header = await MqttPacketReader.ReadHeaderFromSourceAsync(stream, cancellationToken); if (header == null) { return null; diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs index a22707c..ef07848 100644 --- a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs +++ b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs @@ -22,17 +22,18 @@ namespace MQTTnet.Serializer public bool EndOfRemainingData => BaseStream.Position == _header.BodyLength; - public static MqttPacketHeader ReadHeaderFromSource(Stream stream, CancellationToken cancellationToken) + public static async Task ReadHeaderFromSourceAsync(Stream stream, CancellationToken cancellationToken) { - var buffer = stream.ReadByte(); - if (buffer == -1) + byte[] singleByteBuf = new byte[1]; + var readCount = await stream.ReadAsync(singleByteBuf, 0, singleByteBuf.Length).ConfigureAwait(false); + if (readCount <= 0) { return null; } - - var fixedHeader = (byte)buffer; + + var fixedHeader = singleByteBuf[0]; var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4); - var bodyLength = ReadBodyLengthFromSource(stream, cancellationToken); + var bodyLength = await ReadBodyLengthFromSourceAsync(stream, cancellationToken).ConfigureAwait(false); return new MqttPacketHeader { @@ -80,13 +81,15 @@ namespace MQTTnet.Serializer return ReadBytes(_header.BodyLength - (int)BaseStream.Position); } - private static int ReadBodyLengthFromSource(Stream stream, CancellationToken cancellationToken) + private static async Task ReadBodyLengthFromSourceAsync(Stream stream, CancellationToken cancellationToken) { // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. var multiplier = 1; var value = 0; byte encodedByte; + byte[] singleByteBuf = new byte[1]; + var readBytes = new List(); do { @@ -95,13 +98,13 @@ namespace MQTTnet.Serializer throw new TaskCanceledException(); } - var buffer = stream.ReadByte(); - if (buffer == -1) + int readCount = await stream.ReadAsync(singleByteBuf, 0, singleByteBuf.Length).ConfigureAwait(false); + if (readCount <= 0) { throw new MqttCommunicationException("Connection closed while reading remaining length data."); } - encodedByte = (byte)buffer; + encodedByte = singleByteBuf[0]; readBytes.Add(encodedByte); value += (byte)(encodedByte & 127) * multiplier; diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs index 2048d52..e092d3b 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs @@ -12,7 +12,8 @@ namespace MQTTnet.Server { public sealed class MqttClientPendingMessagesQueue : IDisposable { - private readonly BlockingCollection _queue = new BlockingCollection(); + private readonly ConcurrentQueue _queue = new ConcurrentQueue(); + private readonly SemaphoreSlim _queueWaitSemaphore = new SemaphoreSlim(0); private readonly IMqttServerOptions _options; private readonly MqttClientSession _session; private readonly IMqttNetLogger _logger; @@ -40,7 +41,8 @@ namespace MQTTnet.Server { if (packet == null) throw new ArgumentNullException(nameof(packet)); - _queue.Add(packet); + _queue.Enqueue(packet); + _queueWaitSemaphore.Release(); _logger.Trace("Enqueued packet (ClientId: {0}).", _session.ClientId); } @@ -67,7 +69,10 @@ namespace MQTTnet.Server MqttBasePacket packet = null; try { - packet = _queue.Take(cancellationToken); + await _queueWaitSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + if (!_queue.TryDequeue(out packet)) { + throw new InvalidOperationException(); // should not happen + } await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false); _logger.Trace("Enqueued packet sent (ClientId: {0}).", _session.ClientId); @@ -95,7 +100,8 @@ namespace MQTTnet.Server if (publishPacket.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) { publishPacket.Dup = true; - _queue.Add(packet, CancellationToken.None); + _queue.Enqueue(packet); + _queueWaitSemaphore.Release(); } } @@ -105,7 +111,7 @@ namespace MQTTnet.Server public void Dispose() { - _queue?.Dispose(); + _queueWaitSemaphore?.Dispose(); } } }