Przeglądaj źródła

Fix message handling for UWP.

release/3.x.x
Christian Kratky 6 lat temu
rodzic
commit
2f10295022
3 zmienionych plików z 75 dodań i 18 usunięć
  1. +11
    -6
      Source/MQTTnet/Adapter/MqttChannelAdapter.cs
  2. +64
    -10
      Source/MQTTnet/Serializer/MqttPacketReader.cs
  3. +0
    -2
      Source/MQTTnet/Serializer/MqttPacketWriter.cs

+ 11
- 6
Source/MQTTnet/Adapter/MqttChannelAdapter.cs Wyświetl plik

@@ -97,13 +97,11 @@ namespace MQTTnet.Adapter
await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
_logger.Verbose("TX >>> {0}", packet);

var packetData = PacketSerializer.Serialize(packet);

await _channel.WriteAsync(packetData.Array, packetData.Offset, packetData.Count, cancellationToken).ConfigureAwait(false);

PacketSerializer.FreeBuffer();

_logger.Verbose("TX >>> {0}", packet);
}
catch (Exception exception)
{
@@ -149,7 +147,7 @@ namespace MQTTnet.Adapter
}

_logger.Verbose("RX <<< {0}", packet);
return packet;
}
catch (Exception exception)
@@ -190,9 +188,16 @@ namespace MQTTnet.Adapter
chunkSize = bytesLeft;
}

#if WINDOWS_UWP
var readBytes = await channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false);
#else
// async/await is not used to avoid the overhead of context switches. We assume that the reamining data
// has been sent from the sender directly after the initial bytes.
var readBytes = channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).GetAwaiter().GetResult();
var readBytes = channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult();
#endif

cancellationToken.ThrowIfCancellationRequested();

if (readBytes <= 0)
{
ExceptionHelper.ThrowGracefulSocketClose();


+ 64
- 10
Source/MQTTnet/Serializer/MqttPacketReader.cs Wyświetl plik

@@ -34,50 +34,104 @@ namespace MQTTnet.Serializer
{
return new MqttFixedHeader(buffer[0], 0);
}

#if WINDOWS_UWP
// UWP will have a dead lock when calling this not async.
var bodyLength = await ReadBodyLengthAsync(channel, buffer[1], singleByteBuffer, cancellationToken).ConfigureAwait(false);
#else
// Here the async/await pattern is not used becuase the overhead of context switches
// is too big for reading 1 byte in a row. We expect that the remaining data was sent
// directly after the initial bytes. If the client disconnects just in this moment we
// will get an exception anyway.
var bodyLength = ReadBodyLength(channel, buffer[1], singleByteBuffer, cancellationToken);
#endif

return new MqttFixedHeader(buffer[0], bodyLength);
}

#if !WINDOWS_UWP
private static int ReadBodyLength(IMqttChannel channel, byte initialEncodedByte, byte[] singleByteBuffer, CancellationToken cancellationToken)
{
// Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html.
var offset = 0;
var multiplier = 128;
var value = initialEncodedByte & 127;
int encodedByte = initialEncodedByte;

while ((encodedByte & 128) != 0)
{
offset++;
if (offset > 3)
{
throw new MqttProtocolViolationException("Remaining length is invalid.");
}

cancellationToken.ThrowIfCancellationRequested();

// Here the async/await pattern is not used becuase the overhead of context switches
// is too big for reading 1 byte in a row. We expect that the remaining data was sent
// directly after the initial bytes. If the client disconnects just in this moment we
// will get an exception anyway.
encodedByte = ReadByte(channel, singleByteBuffer, cancellationToken);

value += (byte)(encodedByte & 127) * multiplier;
if (multiplier > 128 * 128 * 128)
multiplier *= 128;
}

return value;
}

private static byte ReadByte(IMqttChannel channel, byte[] singleByteBuffer, CancellationToken cancellationToken)
{
var readCount = channel.ReadAsync(singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult();

cancellationToken.ThrowIfCancellationRequested();

if (readCount <= 0)
{
ExceptionHelper.ThrowGracefulSocketClose();
}

return singleByteBuffer[0];
}

#else
private static async Task<int> ReadBodyLengthAsync(IMqttChannel channel, byte initialEncodedByte, byte[] singleByteBuffer, CancellationToken cancellationToken)
{
var offset = 0;
var multiplier = 128;
var value = initialEncodedByte & 127;
int encodedByte = initialEncodedByte;

while ((encodedByte & 128) != 0)
{
offset++;
if (offset > 3)
{
throw new MqttProtocolViolationException("Remaining length is invalid.");
}

cancellationToken.ThrowIfCancellationRequested();

encodedByte = await ReadByteAsync(channel, singleByteBuffer, cancellationToken).ConfigureAwait(false);

value += (byte)(encodedByte & 127) * multiplier;
multiplier *= 128;
}

return value;
}

private static byte ReadByte(IMqttChannel channel, byte[] singleByteBuffer, CancellationToken cancellationToken)
private static async Task<byte> ReadByteAsync(IMqttChannel channel, byte[] singleByteBuffer, CancellationToken cancellationToken)
{
var readCount = channel.ReadAsync(singleByteBuffer, 0, 1, cancellationToken).GetAwaiter().GetResult();
var readCount = await channel.ReadAsync(singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested();

if (readCount <= 0)
{
cancellationToken.ThrowIfCancellationRequested();
ExceptionHelper.ThrowGracefulSocketClose();
}

return singleByteBuffer[0];
}

#endif
}
}

+ 0
- 2
Source/MQTTnet/Serializer/MqttPacketWriter.cs Wyświetl plik

@@ -29,7 +29,6 @@ namespace MQTTnet.Serializer

public static ArraySegment<byte> EncodeRemainingLength(int length)
{
// write the encoded remaining length right aligned on the 4 byte buffer
if (length <= 0)
{
return new ArraySegment<byte>(new byte[1], 0, 1);
@@ -38,7 +37,6 @@ namespace MQTTnet.Serializer
var buffer = new byte[4];
var bufferOffset = 0;

// Algorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var x = length;
do
{


Ładowanie…
Anuluj
Zapisz