Browse Source

Refactor graceful socket close.

release/3.x.x
Christian Kratky 6 years ago
parent
commit
0505efbac1
3 changed files with 28 additions and 25 deletions
  1. +5
    -10
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
  2. +12
    -0
      Frameworks/MQTTnet.NetStandard/Internal/ExceptionHelper.cs
  3. +11
    -15
      Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs

+ 5
- 10
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs View File

@@ -8,6 +8,7 @@ using System.Threading.Tasks;
using MQTTnet.Channel; using MQTTnet.Channel;
using MQTTnet.Diagnostics; using MQTTnet.Diagnostics;
using MQTTnet.Exceptions; using MQTTnet.Exceptions;
using MQTTnet.Internal;
using MQTTnet.Packets; using MQTTnet.Packets;
using MQTTnet.Serializer; using MQTTnet.Serializer;


@@ -133,18 +134,14 @@ namespace MQTTnet.Adapter
private async Task<ReceivedMqttPacket> ReceiveAsync(IMqttChannel channel, CancellationToken cancellationToken) private async Task<ReceivedMqttPacket> ReceiveAsync(IMqttChannel channel, CancellationToken cancellationToken)
{ {
var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(channel, cancellationToken).ConfigureAwait(false); var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(channel, cancellationToken).ConfigureAwait(false);
if (!fixedHeader.HasValue)
{
return null;
}

ReadingPacketStarted?.Invoke(this, EventArgs.Empty); ReadingPacketStarted?.Invoke(this, EventArgs.Empty);
try try
{ {
var bodyLength = await MqttPacketReader.ReadBodyLengthAsync(channel, cancellationToken).ConfigureAwait(false); var bodyLength = await MqttPacketReader.ReadBodyLengthAsync(channel, cancellationToken).ConfigureAwait(false);
if (bodyLength == 0) if (bodyLength == 0)
{ {
return new ReceivedMqttPacket(fixedHeader.Value, null);
return new ReceivedMqttPacket(fixedHeader, null);
} }


var body = new MemoryStream(bodyLength); var body = new MemoryStream(bodyLength);
@@ -159,11 +156,9 @@ namespace MQTTnet.Adapter
} }


var readBytesCount = await channel.ReadAsync(buffer, 0, bytesLeft, cancellationToken).ConfigureAwait(false); var readBytesCount = await channel.ReadAsync(buffer, 0, bytesLeft, cancellationToken).ConfigureAwait(false);

// Check if the client closed the connection before sending the full body.
if (readBytesCount <= 0) if (readBytesCount <= 0)
{ {
throw new MqttCommunicationException("Connection closed while reading remaining packet body.");
ExceptionHelper.ThrowGracefulSocketClose();
} }


// Here is no need to await because internally only an array is used and no real I/O operation is made. // Here is no need to await because internally only an array is used and no real I/O operation is made.
@@ -173,7 +168,7 @@ namespace MQTTnet.Adapter


body.Seek(0L, SeekOrigin.Begin); body.Seek(0L, SeekOrigin.Begin);


return new ReceivedMqttPacket(fixedHeader.Value, body);
return new ReceivedMqttPacket(fixedHeader, body);
} }
finally finally
{ {


+ 12
- 0
Frameworks/MQTTnet.NetStandard/Internal/ExceptionHelper.cs View File

@@ -0,0 +1,12 @@
using MQTTnet.Exceptions;

namespace MQTTnet.Internal
{
public static class ExceptionHelper
{
public static void ThrowGracefulSocketClose()
{
throw new MqttCommunicationException("Connection gracefully closed from the remote party.");
}
}
}

+ 11
- 15
Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs View File

@@ -5,18 +5,14 @@ using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Channel; using MQTTnet.Channel;
using MQTTnet.Exceptions; using MQTTnet.Exceptions;
using MQTTnet.Internal;


namespace MQTTnet.Serializer namespace MQTTnet.Serializer
{ {
public static class MqttPacketReader public static class MqttPacketReader
{ {
public static async Task<byte?> ReadFixedHeaderAsync(IMqttChannel channel, CancellationToken cancellationToken)
public static async Task<byte> ReadFixedHeaderAsync(IMqttChannel channel, CancellationToken cancellationToken)
{ {
if (cancellationToken.IsCancellationRequested)
{
return null;
}

// Wait for the next package which starts with the header. At this point there will probably // Wait for the next package which starts with the header. At this point there will probably
// some large delay and thus the thread should be put back to the pool (await). So ReadByte() // some large delay and thus the thread should be put back to the pool (await). So ReadByte()
// is not an option here. // is not an option here.
@@ -24,7 +20,7 @@ namespace MQTTnet.Serializer
var readCount = await channel.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); var readCount = await channel.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
if (readCount <= 0) if (readCount <= 0)
{ {
return null;
ExceptionHelper.ThrowGracefulSocketClose();
} }


return buffer[0]; return buffer[0];
@@ -68,13 +64,6 @@ namespace MQTTnet.Serializer
return stream.ReadBytes((int)(stream.Length - stream.Position)); return stream.ReadBytes((int)(stream.Length - stream.Position));
} }


public static byte[] ReadBytes(this Stream stream, int count)
{
var buffer = new byte[count];
stream.Read(buffer, 0, count);
return buffer;
}

public static async Task<int> ReadBodyLengthAsync(IMqttChannel channel, CancellationToken cancellationToken) public static async Task<int> ReadBodyLengthAsync(IMqttChannel channel, CancellationToken cancellationToken)
{ {
// Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html. // Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html.
@@ -93,7 +82,7 @@ namespace MQTTnet.Serializer
var readCount = await channel.ReadAsync(buffer, 0, 1, cancellationToken).ConfigureAwait(false); var readCount = await channel.ReadAsync(buffer, 0, 1, cancellationToken).ConfigureAwait(false);
if (readCount <= 0) if (readCount <= 0)
{ {
throw new MqttCommunicationException("Connection closed while reading remaining length data.");
ExceptionHelper.ThrowGracefulSocketClose();
} }


encodedByte = buffer[0]; encodedByte = buffer[0];
@@ -109,5 +98,12 @@ namespace MQTTnet.Serializer


return value; return value;
} }

private static byte[] ReadBytes(this Stream stream, int count)
{
var buffer = new byte[count];
stream.Read(buffer, 0, count);
return buffer;
}
} }
} }

Loading…
Cancel
Save