Browse Source

Refactoring.

release/3.x.x
Christian Kratky 6 years ago
parent
commit
9efee7af21
3 changed files with 18 additions and 16 deletions
  1. +7
    -11
      Source/MQTTnet/Adapter/MqttChannelAdapter.cs
  2. +8
    -0
      Source/MQTTnet/Internal/ExceptionHelper.cs
  3. +3
    -5
      Source/MQTTnet/Serializer/MqttPacketReader.cs

+ 7
- 11
Source/MQTTnet/Adapter/MqttChannelAdapter.cs View File

@@ -128,11 +128,11 @@ namespace MQTTnet.Adapter

if (timeout > TimeSpan.Zero)
{
receivedMqttPacket = await Internal.TaskExtensions.TimeoutAfterAsync(ct => ReceiveAsync(_channel, ct), timeout, cancellationToken).ConfigureAwait(false);
receivedMqttPacket = await Internal.TaskExtensions.TimeoutAfterAsync(ReceiveAsync, timeout, cancellationToken).ConfigureAwait(false);
}
else
{
receivedMqttPacket = await ReceiveAsync(_channel, cancellationToken).ConfigureAwait(false);
receivedMqttPacket = await ReceiveAsync(cancellationToken).ConfigureAwait(false);
}

if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested)
@@ -163,9 +163,9 @@ namespace MQTTnet.Adapter
return null;
}

private async Task<ReceivedMqttPacket> ReceiveAsync(IMqttChannel channel, CancellationToken cancellationToken)
private async Task<ReceivedMqttPacket> ReceiveAsync(CancellationToken cancellationToken)
{
var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(channel, _fixedHeaderBuffer, _singleByteBuffer, cancellationToken).ConfigureAwait(false);
var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(_channel, _fixedHeaderBuffer, _singleByteBuffer, cancellationToken).ConfigureAwait(false);

try
{
@@ -189,19 +189,15 @@ namespace MQTTnet.Adapter
}

#if WINDOWS_UWP
var readBytes = await channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false);
var readBytes = await _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false);
#else
// async/await is not used to avoid the overhead of context switches. We assume that the reamining data
// has been sent from the sender directly after the initial bytes.
var readBytes = channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult();
var readBytes = _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult();
#endif

cancellationToken.ThrowIfCancellationRequested();

if (readBytes <= 0)
{
ExceptionHelper.ThrowGracefulSocketClose();
}
ExceptionHelper.ThrowIfGracefulSocketClose(readBytes);

bodyOffset += readBytes;
} while (bodyOffset < body.Length);


+ 8
- 0
Source/MQTTnet/Internal/ExceptionHelper.cs View File

@@ -8,5 +8,13 @@ namespace MQTTnet.Internal
{
throw new MqttCommunicationClosedGracefullyException();
}

public static void ThrowIfGracefulSocketClose(int readBytesCount)
{
if (readBytesCount <= 0)
{
throw new MqttCommunicationClosedGracefullyException();
}
}
}
}

+ 3
- 5
Source/MQTTnet/Serializer/MqttPacketReader.cs View File

@@ -20,11 +20,9 @@ namespace MQTTnet.Serializer
while (totalBytesRead < buffer.Length)
{
var bytesRead = await channel.ReadAsync(buffer, totalBytesRead, buffer.Length - totalBytesRead, cancellationToken).ConfigureAwait(false);
if (bytesRead <= 0)
{
cancellationToken.ThrowIfCancellationRequested();
ExceptionHelper.ThrowGracefulSocketClose();
}

cancellationToken.ThrowIfCancellationRequested();
ExceptionHelper.ThrowIfGracefulSocketClose(bytesRead);

totalBytesRead += bytesRead;
}


Loading…
Cancel
Save