From 9c46b8fd7518e02e9970a4599747157ebdee65e0 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 31 Oct 2017 14:10:39 +0100 Subject: [PATCH] Fix handling of closed streams --- .../Adapter/MqttChannelCommunicationAdapter.cs | 6 +++++- MQTTnet.Core/Serializer/MqttPacketReader.cs | 16 +++++++++++----- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index d47d4ce..7084227 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -151,7 +151,7 @@ namespace MQTTnet.Core.Adapter receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).ConfigureAwait(false); } - if (cancellationToken.IsCancellationRequested) + if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested) { throw new TaskCanceledException(); } @@ -190,6 +190,10 @@ namespace MQTTnet.Core.Adapter private static async Task ReceiveAsync(Stream stream, CancellationToken cancellationToken) { var header = MqttPacketReader.ReadHeaderFromSource(stream, cancellationToken); + if (header == null) + { + return null; + } if (header.BodyLength == 0) { diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index ce80cd5..97b6478 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -25,7 +25,13 @@ namespace MQTTnet.Core.Serializer public static MqttPacketHeader ReadHeaderFromSource(Stream stream, CancellationToken cancellationToken) { - var fixedHeader = (byte)stream.ReadByte(); + var buffer = stream.ReadByte(); + if (buffer == -1) + { + return null; + } + + var fixedHeader = (byte)buffer; var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4); var bodyLength = ReadBodyLengthFromSource(stream, cancellationToken); @@ -83,10 +89,10 @@ namespace MQTTnet.Core.Serializer var buffer = stream.ReadByte(); readBytes.Add(buffer); - ////if (buffer == -1) - ////{ - //// break; - ////} + if (buffer == -1) + { + break; + } encodedByte = (byte)buffer; value += (byte)(encodedByte & 127) * multiplier;