|
|
@@ -24,14 +24,17 @@ namespace MQTTnet.Serializer |
|
|
|
|
|
|
|
public static async Task<MqttPacketHeader> ReadHeaderFromSourceAsync(Stream stream, CancellationToken cancellationToken) |
|
|
|
{ |
|
|
|
byte[] singleByteBuf = new byte[1]; |
|
|
|
var readCount = await stream.ReadAsync(singleByteBuf, 0, singleByteBuf.Length).ConfigureAwait(false); |
|
|
|
// Wait for the next package which starts with the header. At this point there will probably |
|
|
|
// some large delay and thus the thread should be put back to the pool (await). So ReadByte() |
|
|
|
// is not an option here. |
|
|
|
var buffer = new byte[1]; |
|
|
|
var readCount = await stream.ReadAsync(buffer, 0, 1, cancellationToken).ConfigureAwait(false); |
|
|
|
if (readCount <= 0) |
|
|
|
{ |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
|
var fixedHeader = singleByteBuf[0]; |
|
|
|
var fixedHeader = buffer[0]; |
|
|
|
var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4); |
|
|
|
var bodyLength = await ReadBodyLengthFromSourceAsync(stream, cancellationToken).ConfigureAwait(false); |
|
|
|
|
|
|
@@ -88,8 +91,7 @@ namespace MQTTnet.Serializer |
|
|
|
var value = 0; |
|
|
|
byte encodedByte; |
|
|
|
|
|
|
|
byte[] singleByteBuf = new byte[1]; |
|
|
|
|
|
|
|
var buffer = new byte[1]; |
|
|
|
var readBytes = new List<byte>(); |
|
|
|
do |
|
|
|
{ |
|
|
@@ -98,13 +100,13 @@ namespace MQTTnet.Serializer |
|
|
|
throw new TaskCanceledException(); |
|
|
|
} |
|
|
|
|
|
|
|
int readCount = await stream.ReadAsync(singleByteBuf, 0, singleByteBuf.Length).ConfigureAwait(false); |
|
|
|
var readCount = await stream.ReadAsync(buffer, 0, 1, cancellationToken).ConfigureAwait(false); |
|
|
|
if (readCount <= 0) |
|
|
|
{ |
|
|
|
throw new MqttCommunicationException("Connection closed while reading remaining length data."); |
|
|
|
} |
|
|
|
|
|
|
|
encodedByte = singleByteBuf[0]; |
|
|
|
encodedByte = buffer[0]; |
|
|
|
readBytes.Add(encodedByte); |
|
|
|
|
|
|
|
value += (byte)(encodedByte & 127) * multiplier; |
|
|
|