From 0505efbac1e91bb2962c6af11de046c13795333c Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 31 May 2018 21:34:51 +0200 Subject: [PATCH] Refactor graceful socket close. --- .../Adapter/MqttChannelAdapter.cs | 15 ++++------- .../Internal/ExceptionHelper.cs | 12 +++++++++ .../Serializer/MqttPacketReader.cs | 26 ++++++++----------- 3 files changed, 28 insertions(+), 25 deletions(-) create mode 100644 Frameworks/MQTTnet.NetStandard/Internal/ExceptionHelper.cs diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index 7b8f08a..92c8f88 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using MQTTnet.Channel; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; +using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Serializer; @@ -133,18 +134,14 @@ namespace MQTTnet.Adapter private async Task ReceiveAsync(IMqttChannel channel, CancellationToken cancellationToken) { var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(channel, cancellationToken).ConfigureAwait(false); - if (!fixedHeader.HasValue) - { - return null; - } - + ReadingPacketStarted?.Invoke(this, EventArgs.Empty); try { var bodyLength = await MqttPacketReader.ReadBodyLengthAsync(channel, cancellationToken).ConfigureAwait(false); if (bodyLength == 0) { - return new ReceivedMqttPacket(fixedHeader.Value, null); + return new ReceivedMqttPacket(fixedHeader, null); } var body = new MemoryStream(bodyLength); @@ -159,11 +156,9 @@ namespace MQTTnet.Adapter } var readBytesCount = await channel.ReadAsync(buffer, 0, bytesLeft, cancellationToken).ConfigureAwait(false); - - // Check if the client closed the connection before sending the full body. if (readBytesCount <= 0) { - throw new MqttCommunicationException("Connection closed while reading remaining packet body."); + ExceptionHelper.ThrowGracefulSocketClose(); } // Here is no need to await because internally only an array is used and no real I/O operation is made. @@ -173,7 +168,7 @@ namespace MQTTnet.Adapter body.Seek(0L, SeekOrigin.Begin); - return new ReceivedMqttPacket(fixedHeader.Value, body); + return new ReceivedMqttPacket(fixedHeader, body); } finally { diff --git a/Frameworks/MQTTnet.NetStandard/Internal/ExceptionHelper.cs b/Frameworks/MQTTnet.NetStandard/Internal/ExceptionHelper.cs new file mode 100644 index 0000000..2b50451 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Internal/ExceptionHelper.cs @@ -0,0 +1,12 @@ +using MQTTnet.Exceptions; + +namespace MQTTnet.Internal +{ + public static class ExceptionHelper + { + public static void ThrowGracefulSocketClose() + { + throw new MqttCommunicationException("Connection gracefully closed from the remote party."); + } + } +} diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs index 432dc90..4b3b6b1 100644 --- a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs +++ b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs @@ -5,18 +5,14 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Channel; using MQTTnet.Exceptions; +using MQTTnet.Internal; namespace MQTTnet.Serializer { public static class MqttPacketReader { - public static async Task ReadFixedHeaderAsync(IMqttChannel channel, CancellationToken cancellationToken) + public static async Task ReadFixedHeaderAsync(IMqttChannel channel, CancellationToken cancellationToken) { - if (cancellationToken.IsCancellationRequested) - { - return null; - } - // Wait for the next package which starts with the header. At this point there will probably // some large delay and thus the thread should be put back to the pool (await). So ReadByte() // is not an option here. @@ -24,7 +20,7 @@ namespace MQTTnet.Serializer var readCount = await channel.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); if (readCount <= 0) { - return null; + ExceptionHelper.ThrowGracefulSocketClose(); } return buffer[0]; @@ -68,13 +64,6 @@ namespace MQTTnet.Serializer return stream.ReadBytes((int)(stream.Length - stream.Position)); } - public static byte[] ReadBytes(this Stream stream, int count) - { - var buffer = new byte[count]; - stream.Read(buffer, 0, count); - return buffer; - } - public static async Task ReadBodyLengthAsync(IMqttChannel channel, CancellationToken cancellationToken) { // Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html. @@ -93,7 +82,7 @@ namespace MQTTnet.Serializer var readCount = await channel.ReadAsync(buffer, 0, 1, cancellationToken).ConfigureAwait(false); if (readCount <= 0) { - throw new MqttCommunicationException("Connection closed while reading remaining length data."); + ExceptionHelper.ThrowGracefulSocketClose(); } encodedByte = buffer[0]; @@ -109,5 +98,12 @@ namespace MQTTnet.Serializer return value; } + + private static byte[] ReadBytes(this Stream stream, int count) + { + var buffer = new byte[count]; + stream.Read(buffer, 0, count); + return buffer; + } } }