Browse Source

Throw MqttCommunicationException if access to underlying source failed.

release/3.x.x
Christian Kratky 7 years ago
parent
commit
93a3bbd849
1 changed files with 55 additions and 43 deletions
  1. +55
    -43
      MQTTnet.Core/Serializer/MqttPacketReader.cs

+ 55
- 43
MQTTnet.Core/Serializer/MqttPacketReader.cs View File

@@ -13,6 +13,8 @@ namespace MQTTnet.Core.Serializer
private readonly MemoryStream _remainingData = new MemoryStream(); private readonly MemoryStream _remainingData = new MemoryStream();
private readonly IMqttCommunicationChannel _source; private readonly IMqttCommunicationChannel _source;


private int _remainingLength;

public MqttPacketReader(IMqttCommunicationChannel source) public MqttPacketReader(IMqttCommunicationChannel source)
{ {
_source = source ?? throw new ArgumentNullException(nameof(source)); _source = source ?? throw new ArgumentNullException(nameof(source));
@@ -22,8 +24,6 @@ namespace MQTTnet.Core.Serializer


public byte FixedHeader { get; private set; } public byte FixedHeader { get; private set; }


public int RemainingLength { get; private set; }

public bool EndOfRemainingData => _remainingData.Position == _remainingData.Length; public bool EndOfRemainingData => _remainingData.Position == _remainingData.Length;


public async Task ReadToEndAsync() public async Task ReadToEndAsync()
@@ -31,54 +31,18 @@ namespace MQTTnet.Core.Serializer
await ReadFixedHeaderAsync(); await ReadFixedHeaderAsync();
await ReadRemainingLengthAsync(); await ReadRemainingLengthAsync();


if (RemainingLength == 0)
if (_remainingLength == 0)
{ {
return; return;
} }


var buffer = new byte[RemainingLength];
await _source.ReadAsync(buffer);
var buffer = new byte[_remainingLength];
await ReadFromSourceAsync(buffer);
_remainingData.Write(buffer, 0, buffer.Length); _remainingData.Write(buffer, 0, buffer.Length);
_remainingData.Position = 0; _remainingData.Position = 0;
} }


private async Task ReadFixedHeaderAsync()
{
FixedHeader = await ReadStreamByteAsync();

var byteReader = new ByteReader(FixedHeader);
byteReader.Read(4);
ControlPacketType = (MqttControlPacketType)byteReader.Read(4);
}

private async Task<byte> ReadStreamByteAsync()
{
var buffer = new byte[1];
await _source.ReadAsync(buffer);
return buffer[0];
}

private async Task ReadRemainingLengthAsync()
{
// Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var multiplier = 1;
var value = 0;
byte encodedByte;
do
{
encodedByte = await ReadStreamByteAsync();
value += (encodedByte & 127) * multiplier;
multiplier *= 128;
if (multiplier > 128 * 128 * 128)
{
throw new MqttProtocolViolationException("Remaining length is ivalid.");
}
} while ((encodedByte & 128) != 0);

RemainingLength = value;
}

public async Task<byte> ReadRemainingDataByteAsync() public async Task<byte> ReadRemainingDataByteAsync()
{ {
return (await ReadRemainingDataAsync(1))[0]; return (await ReadRemainingDataAsync(1))[0];
@@ -109,7 +73,7 @@ namespace MQTTnet.Core.Serializer


public Task<byte[]> ReadRemainingDataAsync() public Task<byte[]> ReadRemainingDataAsync()
{ {
return ReadRemainingDataAsync(RemainingLength - (int)_remainingData.Position);
return ReadRemainingDataAsync(_remainingLength - (int)_remainingData.Position);
} }


public async Task<byte[]> ReadRemainingDataAsync(int length) public async Task<byte[]> ReadRemainingDataAsync(int length)
@@ -120,6 +84,54 @@ namespace MQTTnet.Core.Serializer
return buffer; return buffer;
} }


private async Task ReadRemainingLengthAsync()
{
// Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var multiplier = 1;
var value = 0;
byte encodedByte;
do
{
encodedByte = await ReadStreamByteAsync();
value += (encodedByte & 127) * multiplier;
multiplier *= 128;
if (multiplier > 128 * 128 * 128)
{
throw new MqttProtocolViolationException("Remaining length is ivalid.");
}
} while ((encodedByte & 128) != 0);

_remainingLength = value;
}

private Task ReadFromSourceAsync(byte[] buffer)
{
try
{
return _source.ReadAsync(buffer);
}
catch (Exception exception)
{
throw new MqttCommunicationException(exception);
}
}

private async Task<byte> ReadStreamByteAsync()
{
var buffer = new byte[1];
await ReadFromSourceAsync(buffer);
return buffer[0];
}

private async Task ReadFixedHeaderAsync()
{
FixedHeader = await ReadStreamByteAsync();

var byteReader = new ByteReader(FixedHeader);
byteReader.Read(4);
ControlPacketType = (MqttControlPacketType)byteReader.Read(4);
}

public void Dispose() public void Dispose()
{ {
_remainingData?.Dispose(); _remainingData?.Dispose();


Loading…
Cancel
Save