diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index f909db1..4e61733 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -128,11 +128,11 @@ namespace MQTTnet.Adapter if (timeout > TimeSpan.Zero) { - receivedMqttPacket = await Internal.TaskExtensions.TimeoutAfterAsync(ct => ReceiveAsync(_channel, ct), timeout, cancellationToken).ConfigureAwait(false); + receivedMqttPacket = await Internal.TaskExtensions.TimeoutAfterAsync(ReceiveAsync, timeout, cancellationToken).ConfigureAwait(false); } else { - receivedMqttPacket = await ReceiveAsync(_channel, cancellationToken).ConfigureAwait(false); + receivedMqttPacket = await ReceiveAsync(cancellationToken).ConfigureAwait(false); } if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested) @@ -163,9 +163,9 @@ namespace MQTTnet.Adapter return null; } - private async Task ReceiveAsync(IMqttChannel channel, CancellationToken cancellationToken) + private async Task ReceiveAsync(CancellationToken cancellationToken) { - var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(channel, _fixedHeaderBuffer, _singleByteBuffer, cancellationToken).ConfigureAwait(false); + var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(_channel, _fixedHeaderBuffer, _singleByteBuffer, cancellationToken).ConfigureAwait(false); try { @@ -189,19 +189,15 @@ namespace MQTTnet.Adapter } #if WINDOWS_UWP - var readBytes = await channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false); + var readBytes = await _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false); #else // async/await is not used to avoid the overhead of context switches. We assume that the reamining data // has been sent from the sender directly after the initial bytes. - var readBytes = channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult(); + var readBytes = _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult(); #endif cancellationToken.ThrowIfCancellationRequested(); - - if (readBytes <= 0) - { - ExceptionHelper.ThrowGracefulSocketClose(); - } + ExceptionHelper.ThrowIfGracefulSocketClose(readBytes); bodyOffset += readBytes; } while (bodyOffset < body.Length); diff --git a/Source/MQTTnet/Internal/ExceptionHelper.cs b/Source/MQTTnet/Internal/ExceptionHelper.cs index f3cadc9..5bc8e43 100644 --- a/Source/MQTTnet/Internal/ExceptionHelper.cs +++ b/Source/MQTTnet/Internal/ExceptionHelper.cs @@ -8,5 +8,13 @@ namespace MQTTnet.Internal { throw new MqttCommunicationClosedGracefullyException(); } + + public static void ThrowIfGracefulSocketClose(int readBytesCount) + { + if (readBytesCount <= 0) + { + throw new MqttCommunicationClosedGracefullyException(); + } + } } } diff --git a/Source/MQTTnet/Serializer/MqttPacketReader.cs b/Source/MQTTnet/Serializer/MqttPacketReader.cs index 73e8a66..f115b50 100644 --- a/Source/MQTTnet/Serializer/MqttPacketReader.cs +++ b/Source/MQTTnet/Serializer/MqttPacketReader.cs @@ -20,11 +20,9 @@ namespace MQTTnet.Serializer while (totalBytesRead < buffer.Length) { var bytesRead = await channel.ReadAsync(buffer, totalBytesRead, buffer.Length - totalBytesRead, cancellationToken).ConfigureAwait(false); - if (bytesRead <= 0) - { - cancellationToken.ThrowIfCancellationRequested(); - ExceptionHelper.ThrowGracefulSocketClose(); - } + + cancellationToken.ThrowIfCancellationRequested(); + ExceptionHelper.ThrowIfGracefulSocketClose(bytesRead); totalBytesRead += bytesRead; }