Browse Source

Make deserialization cancleable

release/3.x.x
Christian Kratky 7 years ago
parent
commit
83f1d74a76
2 changed files with 14 additions and 3 deletions
  1. +10
    -3
      MQTTnet.Core/Serializer/MqttPacketReader.cs
  2. +4
    -0
      MQTTnet.Core/Serializer/MqttPacketSerializer.cs

+ 10
- 3
MQTTnet.Core/Serializer/MqttPacketReader.cs View File

@@ -1,6 +1,8 @@
using System; using System;
using System.IO; using System.IO;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Exceptions; using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
@@ -20,11 +22,11 @@ namespace MQTTnet.Core.Serializer


public bool EndOfRemainingData => BaseStream.Position == _receivedMqttPacket.Header.BodyLength; public bool EndOfRemainingData => BaseStream.Position == _receivedMqttPacket.Header.BodyLength;


public static MqttPacketHeader ReadHeaderFromSource(Stream stream)
public static MqttPacketHeader ReadHeaderFromSource(Stream stream, CancellationToken cancellationToken)
{ {
var fixedHeader = (byte)stream.ReadByte(); var fixedHeader = (byte)stream.ReadByte();
var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4); var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4);
var bodyLength = ReadBodyLengthFromSource(stream);
var bodyLength = ReadBodyLengthFromSource(stream, cancellationToken);


return new MqttPacketHeader return new MqttPacketHeader
{ {
@@ -62,7 +64,7 @@ namespace MQTTnet.Core.Serializer
return ReadBytes(_receivedMqttPacket.Header.BodyLength - (int)BaseStream.Position); return ReadBytes(_receivedMqttPacket.Header.BodyLength - (int)BaseStream.Position);
} }


private static int ReadBodyLengthFromSource(Stream stream)
private static int ReadBodyLengthFromSource(Stream stream, CancellationToken cancellationToken)
{ {
// Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var multiplier = 1; var multiplier = 1;
@@ -70,6 +72,11 @@ namespace MQTTnet.Core.Serializer
byte encodedByte; byte encodedByte;
do do
{ {
if (cancellationToken.IsCancellationRequested)
{
throw new TaskCanceledException();
}

encodedByte = (byte)stream.ReadByte(); encodedByte = (byte)stream.ReadByte();
value += (encodedByte & 127) * multiplier; value += (encodedByte & 127) * multiplier;
multiplier *= 128; multiplier *= 128;


+ 4
- 0
MQTTnet.Core/Serializer/MqttPacketSerializer.cs View File

@@ -546,6 +546,8 @@ namespace MQTTnet.Core.Serializer


private static byte Serialize(MqttSubscribePacket packet, MqttPacketWriter writer) private static byte Serialize(MqttSubscribePacket packet, MqttPacketWriter writer)
{ {
if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");

writer.Write(packet.PacketIdentifier); writer.Write(packet.PacketIdentifier);


if (packet.TopicFilters?.Count > 0) if (packet.TopicFilters?.Count > 0)
@@ -577,6 +579,8 @@ namespace MQTTnet.Core.Serializer


private static byte Serialize(MqttUnsubscribePacket packet, MqttPacketWriter writer) private static byte Serialize(MqttUnsubscribePacket packet, MqttPacketWriter writer)
{ {
if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");

writer.Write(packet.PacketIdentifier); writer.Write(packet.PacketIdentifier);


if (packet.TopicFilters?.Any() == true) if (packet.TopicFilters?.Any() == true)


Loading…
Cancel
Save