diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index f7fbdb2..417c19a 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -1,6 +1,8 @@ using System; using System.IO; using System.Text; +using System.Threading; +using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Protocol; @@ -20,11 +22,11 @@ namespace MQTTnet.Core.Serializer public bool EndOfRemainingData => BaseStream.Position == _receivedMqttPacket.Header.BodyLength; - public static MqttPacketHeader ReadHeaderFromSource(Stream stream) + public static MqttPacketHeader ReadHeaderFromSource(Stream stream, CancellationToken cancellationToken) { var fixedHeader = (byte)stream.ReadByte(); var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4); - var bodyLength = ReadBodyLengthFromSource(stream); + var bodyLength = ReadBodyLengthFromSource(stream, cancellationToken); return new MqttPacketHeader { @@ -62,7 +64,7 @@ namespace MQTTnet.Core.Serializer return ReadBytes(_receivedMqttPacket.Header.BodyLength - (int)BaseStream.Position); } - private static int ReadBodyLengthFromSource(Stream stream) + private static int ReadBodyLengthFromSource(Stream stream, CancellationToken cancellationToken) { // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. var multiplier = 1; @@ -70,6 +72,11 @@ namespace MQTTnet.Core.Serializer byte encodedByte; do { + if (cancellationToken.IsCancellationRequested) + { + throw new TaskCanceledException(); + } + encodedByte = (byte)stream.ReadByte(); value += (encodedByte & 127) * multiplier; multiplier *= 128; diff --git a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs index fcec63c..f43fc72 100644 --- a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs @@ -546,6 +546,8 @@ namespace MQTTnet.Core.Serializer private static byte Serialize(MqttSubscribePacket packet, MqttPacketWriter writer) { + if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3]."); + writer.Write(packet.PacketIdentifier); if (packet.TopicFilters?.Count > 0) @@ -577,6 +579,8 @@ namespace MQTTnet.Core.Serializer private static byte Serialize(MqttUnsubscribePacket packet, MqttPacketWriter writer) { + if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2]."); + writer.Write(packet.PacketIdentifier); if (packet.TopicFilters?.Any() == true)