Przeglądaj źródła

Fix handling of closed streams

release/3.x.x
Christian Kratky 7 lat temu
rodzic
commit
9c46b8fd75
2 zmienionych plików z 16 dodań i 6 usunięć
  1. +5
    -1
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  2. +11
    -5
      MQTTnet.Core/Serializer/MqttPacketReader.cs

+ 5
- 1
MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs Wyświetl plik

@@ -151,7 +151,7 @@ namespace MQTTnet.Core.Adapter
receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, cancellationToken).ConfigureAwait(false);
}

if (cancellationToken.IsCancellationRequested)
if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested)
{
throw new TaskCanceledException();
}
@@ -190,6 +190,10 @@ namespace MQTTnet.Core.Adapter
private static async Task<ReceivedMqttPacket> ReceiveAsync(Stream stream, CancellationToken cancellationToken)
{
var header = MqttPacketReader.ReadHeaderFromSource(stream, cancellationToken);
if (header == null)
{
return null;
}

if (header.BodyLength == 0)
{


+ 11
- 5
MQTTnet.Core/Serializer/MqttPacketReader.cs Wyświetl plik

@@ -25,7 +25,13 @@ namespace MQTTnet.Core.Serializer

public static MqttPacketHeader ReadHeaderFromSource(Stream stream, CancellationToken cancellationToken)
{
var fixedHeader = (byte)stream.ReadByte();
var buffer = stream.ReadByte();
if (buffer == -1)
{
return null;
}

var fixedHeader = (byte)buffer;
var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4);
var bodyLength = ReadBodyLengthFromSource(stream, cancellationToken);

@@ -83,10 +89,10 @@ namespace MQTTnet.Core.Serializer
var buffer = stream.ReadByte();
readBytes.Add(buffer);

////if (buffer == -1)
////{
//// break;
////}
if (buffer == -1)
{
break;
}

encodedByte = (byte)buffer;
value += (byte)(encodedByte & 127) * multiplier;


Ładowanie…
Anuluj
Zapisz