|
@@ -21,8 +21,6 @@ namespace MQTTnet.Formatter |
|
|
{ |
|
|
{ |
|
|
// The MQTT fixed header contains 1 byte of flags and at least 1 byte for the remaining data length. |
|
|
// The MQTT fixed header contains 1 byte of flags and at least 1 byte for the remaining data length. |
|
|
// So in all cases at least 2 bytes must be read for a complete MQTT packet. |
|
|
// So in all cases at least 2 bytes must be read for a complete MQTT packet. |
|
|
// async/await is used here because the next packet is received in a couple of minutes so the performance |
|
|
|
|
|
// impact is acceptable according to a useless waiting thread. |
|
|
|
|
|
var buffer = fixedHeaderBuffer; |
|
|
var buffer = fixedHeaderBuffer; |
|
|
var totalBytesRead = 0; |
|
|
var totalBytesRead = 0; |
|
|
|
|
|
|
|
@@ -55,16 +53,7 @@ namespace MQTTnet.Formatter |
|
|
}; |
|
|
}; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
#if WINDOWS_UWP |
|
|
|
|
|
// UWP will have a dead lock when calling this not async. |
|
|
|
|
|
var bodyLength = await ReadBodyLengthAsync(buffer[1], cancellationToken).ConfigureAwait(false); |
|
|
var bodyLength = await ReadBodyLengthAsync(buffer[1], cancellationToken).ConfigureAwait(false); |
|
|
#else |
|
|
|
|
|
// Here the async/await pattern is not used because the overhead of context switches |
|
|
|
|
|
// is too big for reading 1 byte in a row. We expect that the remaining data was sent |
|
|
|
|
|
// directly after the initial bytes. If the client disconnects just in this moment we |
|
|
|
|
|
// will get an exception anyway. |
|
|
|
|
|
var bodyLength = ReadBodyLength(buffer[1], cancellationToken); |
|
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
|
|
if (!bodyLength.HasValue) |
|
|
if (!bodyLength.HasValue) |
|
|
{ |
|
|
{ |
|
@@ -81,49 +70,6 @@ namespace MQTTnet.Formatter |
|
|
}; |
|
|
}; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
#if !WINDOWS_UWP |
|
|
|
|
|
private int? ReadBodyLength(byte initialEncodedByte, CancellationToken cancellationToken) |
|
|
|
|
|
{ |
|
|
|
|
|
var offset = 0; |
|
|
|
|
|
var multiplier = 128; |
|
|
|
|
|
var value = initialEncodedByte & 127; |
|
|
|
|
|
int encodedByte = initialEncodedByte; |
|
|
|
|
|
|
|
|
|
|
|
while ((encodedByte & 128) != 0) |
|
|
|
|
|
{ |
|
|
|
|
|
offset++; |
|
|
|
|
|
if (offset > 3) |
|
|
|
|
|
{ |
|
|
|
|
|
throw new MqttProtocolViolationException("Remaining length is invalid."); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (cancellationToken.IsCancellationRequested) |
|
|
|
|
|
{ |
|
|
|
|
|
return null; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
var readCount = _channel.ReadAsync(_singleByteBuffer, 0, 1, cancellationToken).GetAwaiter().GetResult(); |
|
|
|
|
|
|
|
|
|
|
|
if (cancellationToken.IsCancellationRequested) |
|
|
|
|
|
{ |
|
|
|
|
|
return null; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (readCount == 0) |
|
|
|
|
|
{ |
|
|
|
|
|
return null; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
encodedByte = _singleByteBuffer[0]; |
|
|
|
|
|
|
|
|
|
|
|
value += (encodedByte & 127) * multiplier; |
|
|
|
|
|
multiplier *= 128; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return value; |
|
|
|
|
|
} |
|
|
|
|
|
#else |
|
|
|
|
|
|
|
|
|
|
|
private async Task<int?> ReadBodyLengthAsync(byte initialEncodedByte, CancellationToken cancellationToken) |
|
|
private async Task<int?> ReadBodyLengthAsync(byte initialEncodedByte, CancellationToken cancellationToken) |
|
|
{ |
|
|
{ |
|
|
var offset = 0; |
|
|
var offset = 0; |
|
@@ -164,6 +110,5 @@ namespace MQTTnet.Formatter |
|
|
|
|
|
|
|
|
return value; |
|
|
return value; |
|
|
} |
|
|
} |
|
|
#endif |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |