diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index 7396515..f909db1 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -97,13 +97,11 @@ namespace MQTTnet.Adapter await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { - _logger.Verbose("TX >>> {0}", packet); - var packetData = PacketSerializer.Serialize(packet); - await _channel.WriteAsync(packetData.Array, packetData.Offset, packetData.Count, cancellationToken).ConfigureAwait(false); - PacketSerializer.FreeBuffer(); + + _logger.Verbose("TX >>> {0}", packet); } catch (Exception exception) { @@ -149,7 +147,7 @@ namespace MQTTnet.Adapter } _logger.Verbose("RX <<< {0}", packet); - + return packet; } catch (Exception exception) @@ -190,9 +188,16 @@ namespace MQTTnet.Adapter chunkSize = bytesLeft; } +#if WINDOWS_UWP + var readBytes = await channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false); +#else // async/await is not used to avoid the overhead of context switches. We assume that the reamining data // has been sent from the sender directly after the initial bytes. - var readBytes = channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).GetAwaiter().GetResult(); + var readBytes = channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult(); +#endif + + cancellationToken.ThrowIfCancellationRequested(); + if (readBytes <= 0) { ExceptionHelper.ThrowGracefulSocketClose(); diff --git a/Source/MQTTnet/Serializer/MqttPacketReader.cs b/Source/MQTTnet/Serializer/MqttPacketReader.cs index ff8b54b..73e8a66 100644 --- a/Source/MQTTnet/Serializer/MqttPacketReader.cs +++ b/Source/MQTTnet/Serializer/MqttPacketReader.cs @@ -34,50 +34,104 @@ namespace MQTTnet.Serializer { return new MqttFixedHeader(buffer[0], 0); } - + +#if WINDOWS_UWP + // UWP will have a dead lock when calling this not async. + var bodyLength = await ReadBodyLengthAsync(channel, buffer[1], singleByteBuffer, cancellationToken).ConfigureAwait(false); +#else + // Here the async/await pattern is not used becuase the overhead of context switches + // is too big for reading 1 byte in a row. We expect that the remaining data was sent + // directly after the initial bytes. If the client disconnects just in this moment we + // will get an exception anyway. var bodyLength = ReadBodyLength(channel, buffer[1], singleByteBuffer, cancellationToken); +#endif + return new MqttFixedHeader(buffer[0], bodyLength); } +#if !WINDOWS_UWP private static int ReadBodyLength(IMqttChannel channel, byte initialEncodedByte, byte[] singleByteBuffer, CancellationToken cancellationToken) { - // Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html. + var offset = 0; var multiplier = 128; var value = initialEncodedByte & 127; int encodedByte = initialEncodedByte; while ((encodedByte & 128) != 0) { + offset++; + if (offset > 3) + { + throw new MqttProtocolViolationException("Remaining length is invalid."); + } + cancellationToken.ThrowIfCancellationRequested(); - // Here the async/await pattern is not used becuase the overhead of context switches - // is too big for reading 1 byte in a row. We expect that the remaining data was sent - // directly after the initial bytes. If the client disconnects just in this moment we - // will get an exception anyway. encodedByte = ReadByte(channel, singleByteBuffer, cancellationToken); value += (byte)(encodedByte & 127) * multiplier; - if (multiplier > 128 * 128 * 128) + multiplier *= 128; + } + + return value; + } + + private static byte ReadByte(IMqttChannel channel, byte[] singleByteBuffer, CancellationToken cancellationToken) + { + var readCount = channel.ReadAsync(singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult(); + + cancellationToken.ThrowIfCancellationRequested(); + + if (readCount <= 0) + { + ExceptionHelper.ThrowGracefulSocketClose(); + } + + return singleByteBuffer[0]; + } + +#else + + private static async Task ReadBodyLengthAsync(IMqttChannel channel, byte initialEncodedByte, byte[] singleByteBuffer, CancellationToken cancellationToken) + { + var offset = 0; + var multiplier = 128; + var value = initialEncodedByte & 127; + int encodedByte = initialEncodedByte; + + while ((encodedByte & 128) != 0) + { + offset++; + if (offset > 3) { throw new MqttProtocolViolationException("Remaining length is invalid."); } + cancellationToken.ThrowIfCancellationRequested(); + + encodedByte = await ReadByteAsync(channel, singleByteBuffer, cancellationToken).ConfigureAwait(false); + + value += (byte)(encodedByte & 127) * multiplier; multiplier *= 128; } return value; } - private static byte ReadByte(IMqttChannel channel, byte[] singleByteBuffer, CancellationToken cancellationToken) + private static async Task ReadByteAsync(IMqttChannel channel, byte[] singleByteBuffer, CancellationToken cancellationToken) { - var readCount = channel.ReadAsync(singleByteBuffer, 0, 1, cancellationToken).GetAwaiter().GetResult(); + var readCount = await channel.ReadAsync(singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false); + + cancellationToken.ThrowIfCancellationRequested(); + if (readCount <= 0) { - cancellationToken.ThrowIfCancellationRequested(); ExceptionHelper.ThrowGracefulSocketClose(); } return singleByteBuffer[0]; } + +#endif } } diff --git a/Source/MQTTnet/Serializer/MqttPacketWriter.cs b/Source/MQTTnet/Serializer/MqttPacketWriter.cs index c7f1bc2..67fd467 100644 --- a/Source/MQTTnet/Serializer/MqttPacketWriter.cs +++ b/Source/MQTTnet/Serializer/MqttPacketWriter.cs @@ -29,7 +29,6 @@ namespace MQTTnet.Serializer public static ArraySegment EncodeRemainingLength(int length) { - // write the encoded remaining length right aligned on the 4 byte buffer if (length <= 0) { return new ArraySegment(new byte[1], 0, 1); @@ -38,7 +37,6 @@ namespace MQTTnet.Serializer var buffer = new byte[4]; var bufferOffset = 0; - // Algorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. var x = length; do {