|
|
@@ -12,7 +12,7 @@ namespace MQTTnet.Serializer |
|
|
|
{ |
|
|
|
public static class MqttPacketReader |
|
|
|
{ |
|
|
|
public static async Task<MqttPacketHeader> ReadHeaderAsync(IMqttChannel stream, CancellationToken cancellationToken) |
|
|
|
public static async Task<MqttPacketHeader> ReadHeaderAsync(IMqttChannel channel, CancellationToken cancellationToken) |
|
|
|
{ |
|
|
|
if (cancellationToken.IsCancellationRequested) |
|
|
|
{ |
|
|
@@ -23,20 +23,26 @@ namespace MQTTnet.Serializer |
|
|
|
// some large delay and thus the thread should be put back to the pool (await). So ReadByte() |
|
|
|
// is not an option here. |
|
|
|
var buffer = new byte[1]; |
|
|
|
var readCount = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); |
|
|
|
var readCount = await channel.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); |
|
|
|
if (readCount <= 0) |
|
|
|
{ |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
|
var fixedHeader = buffer[0]; |
|
|
|
var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4); |
|
|
|
var bodyLength = await ReadBodyLengthAsync(stream, cancellationToken).ConfigureAwait(false); |
|
|
|
var controlPacketType = fixedHeader >> 4; |
|
|
|
|
|
|
|
if (controlPacketType < 1 || controlPacketType > 14) |
|
|
|
{ |
|
|
|
throw new MqttProtocolViolationException($"The packet type is invalid ({controlPacketType})."); |
|
|
|
} |
|
|
|
|
|
|
|
var bodyLength = await ReadBodyLengthAsync(channel, cancellationToken).ConfigureAwait(false); |
|
|
|
|
|
|
|
return new MqttPacketHeader |
|
|
|
{ |
|
|
|
FixedHeader = fixedHeader, |
|
|
|
ControlPacketType = controlPacketType, |
|
|
|
ControlPacketType = (MqttControlPacketType)controlPacketType, |
|
|
|
BodyLength = bodyLength |
|
|
|
}; |
|
|
|
} |
|
|
|