diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index bf38bd0..f59d807 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -13,6 +13,8 @@ namespace MQTTnet.Core.Serializer private readonly MemoryStream _remainingData = new MemoryStream(); private readonly IMqttCommunicationChannel _source; + private int _remainingLength; + public MqttPacketReader(IMqttCommunicationChannel source) { _source = source ?? throw new ArgumentNullException(nameof(source)); @@ -22,8 +24,6 @@ namespace MQTTnet.Core.Serializer public byte FixedHeader { get; private set; } - public int RemainingLength { get; private set; } - public bool EndOfRemainingData => _remainingData.Position == _remainingData.Length; public async Task ReadToEndAsync() @@ -31,54 +31,18 @@ namespace MQTTnet.Core.Serializer await ReadFixedHeaderAsync(); await ReadRemainingLengthAsync(); - if (RemainingLength == 0) + if (_remainingLength == 0) { return; } - var buffer = new byte[RemainingLength]; - await _source.ReadAsync(buffer); - + var buffer = new byte[_remainingLength]; + await ReadFromSourceAsync(buffer); + _remainingData.Write(buffer, 0, buffer.Length); _remainingData.Position = 0; } - private async Task ReadFixedHeaderAsync() - { - FixedHeader = await ReadStreamByteAsync(); - - var byteReader = new ByteReader(FixedHeader); - byteReader.Read(4); - ControlPacketType = (MqttControlPacketType)byteReader.Read(4); - } - - private async Task ReadStreamByteAsync() - { - var buffer = new byte[1]; - await _source.ReadAsync(buffer); - return buffer[0]; - } - - private async Task ReadRemainingLengthAsync() - { - // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. - var multiplier = 1; - var value = 0; - byte encodedByte; - do - { - encodedByte = await ReadStreamByteAsync(); - value += (encodedByte & 127) * multiplier; - multiplier *= 128; - if (multiplier > 128 * 128 * 128) - { - throw new MqttProtocolViolationException("Remaining length is ivalid."); - } - } while ((encodedByte & 128) != 0); - - RemainingLength = value; - } - public async Task ReadRemainingDataByteAsync() { return (await ReadRemainingDataAsync(1))[0]; @@ -109,7 +73,7 @@ namespace MQTTnet.Core.Serializer public Task ReadRemainingDataAsync() { - return ReadRemainingDataAsync(RemainingLength - (int)_remainingData.Position); + return ReadRemainingDataAsync(_remainingLength - (int)_remainingData.Position); } public async Task ReadRemainingDataAsync(int length) @@ -120,6 +84,54 @@ namespace MQTTnet.Core.Serializer return buffer; } + private async Task ReadRemainingLengthAsync() + { + // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. + var multiplier = 1; + var value = 0; + byte encodedByte; + do + { + encodedByte = await ReadStreamByteAsync(); + value += (encodedByte & 127) * multiplier; + multiplier *= 128; + if (multiplier > 128 * 128 * 128) + { + throw new MqttProtocolViolationException("Remaining length is ivalid."); + } + } while ((encodedByte & 128) != 0); + + _remainingLength = value; + } + + private Task ReadFromSourceAsync(byte[] buffer) + { + try + { + return _source.ReadAsync(buffer); + } + catch (Exception exception) + { + throw new MqttCommunicationException(exception); + } + } + + private async Task ReadStreamByteAsync() + { + var buffer = new byte[1]; + await ReadFromSourceAsync(buffer); + return buffer[0]; + } + + private async Task ReadFixedHeaderAsync() + { + FixedHeader = await ReadStreamByteAsync(); + + var byteReader = new ByteReader(FixedHeader); + byteReader.Read(4); + ControlPacketType = (MqttControlPacketType)byteReader.Read(4); + } + public void Dispose() { _remainingData?.Dispose();