From 76151deba0aea8f339b49ba8c444489597b9dcf5 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 9 Sep 2017 23:53:42 +0200 Subject: [PATCH] Additional performance improvements --- .../Implementations/MqttServerAdapter.cs | 10 ++--- .../Implementations/MqttTcpChannel.cs | 5 +-- .../Implementations/MqttWebSocketChannel.cs | 13 +++--- .../Implementations/MqttServerAdapter.cs | 10 ++--- .../Implementations/MqttTcpChannel.cs | 4 +- .../Implementations/MqttWebSocketChannel.cs | 13 +++--- .../Implementations/MqttWebSocketChannel.cs | 9 ++-- .../MqttChannelCommunicationAdapter.cs | 8 ++-- MQTTnet.Core/Client/MqttClient.cs | 41 ++++++++++--------- MQTTnet.Core/Client/MqttPacketDispatcher.cs | 2 +- MQTTnet.Core/Serializer/MqttPacketReader.cs | 12 +++--- .../Serializer/MqttPacketSerializer.cs | 2 +- MQTTnet.Core/Server/MqttClientMessageQueue.cs | 6 +-- MQTTnet.Core/Server/MqttClientSession.cs | 4 +- .../Server/MqttClientSessionsManager.cs | 10 ++--- .../MqttPacketSerializerTests.cs | 20 ++++----- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 18 ++++---- .../TestMqttCommunicationAdapter.cs | 16 ++++---- 18 files changed, 100 insertions(+), 103 deletions(-) diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs index 92e071e..a64b445 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs @@ -85,7 +85,7 @@ namespace MQTTnet.Implementations { try { - var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null); + var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } @@ -94,7 +94,7 @@ namespace MQTTnet.Implementations MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint."); //excessive CPU consumed if in endless loop of socket errors - Thread.Sleep(TimeSpan.FromSeconds(1)); + await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); } } } @@ -105,10 +105,10 @@ namespace MQTTnet.Implementations { try { - var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null); + var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null).ConfigureAwait(false); var sslStream = new SslStream(new NetworkStream(clientSocket)); - await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); + await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); @@ -118,7 +118,7 @@ namespace MQTTnet.Implementations MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at TLS endpoint."); //excessive CPU consumed if in endless loop of socket errors - Thread.Sleep(TimeSpan.FromSeconds(1)); + await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); } } } diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index a3b8147..5626256 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -46,7 +46,7 @@ namespace MQTTnet.Implementations _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); } - await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null); + await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null).ConfigureAwait(false); if (options.TlsOptions.UseTls) { @@ -99,12 +99,11 @@ namespace MQTTnet.Implementations try { - int totalBytes = 0; + var totalBytes = 0; do { var read = await _dataStream.ReadAsync(buffer, totalBytes, buffer.Length - totalBytes).ConfigureAwait(false); - if (read == 0) { throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting)); diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs index 29fcae9..b90b02a 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs @@ -17,15 +17,14 @@ namespace MQTTnet.Implementations private int WebSocketBufferSize; private int WebSocketBufferOffset; - public async Task ConnectAsync(MqttClientOptions options) + public Task ConnectAsync(MqttClientOptions options) { _webSocket = null; try { _webSocket = new ClientWebSocket(); - - await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + return _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); } catch (WebSocketException exception) { @@ -33,9 +32,9 @@ namespace MQTTnet.Implementations } } - public async Task DisconnectAsync() + public Task DisconnectAsync() { - await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } public void Dispose() @@ -62,7 +61,7 @@ namespace MQTTnet.Implementations WebSocketReceiveResult response; do { - response = await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None); + response = await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None).ConfigureAwait(false); temporaryBuffer.CopyTo(WebSocketBuffer, offset); offset += response.Count; @@ -72,7 +71,7 @@ namespace MQTTnet.Implementations WebSocketBufferSize = response.Count; if (response.MessageType == WebSocketMessageType.Close) { - await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false); } Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length); diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs index 56c8481..6b02648 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -83,7 +83,7 @@ namespace MQTTnet.Implementations { try { - var clientSocket = await _defaultEndpointSocket.AcceptAsync(); + var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } @@ -92,7 +92,7 @@ namespace MQTTnet.Implementations MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint."); //excessive CPU consumed if in endless loop of socket errors - Thread.Sleep(TimeSpan.FromSeconds(1)); + await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); } } } @@ -103,10 +103,10 @@ namespace MQTTnet.Implementations { try { - var clientSocket = await _tlsEndpointSocket.AcceptAsync(); + var clientSocket = await _tlsEndpointSocket.AcceptAsync().ConfigureAwait(false); var sslStream = new SslStream(new NetworkStream(clientSocket)); - await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); + await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); @@ -116,7 +116,7 @@ namespace MQTTnet.Implementations MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at TLS endpoint."); //excessive CPU consumed if in endless loop of socket errors - Thread.Sleep(TimeSpan.FromSeconds(1)); + await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); } } } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index 8486ca1..5b20cf1 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -38,6 +38,7 @@ namespace MQTTnet.Implementations public async Task ConnectAsync(MqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); + try { if (_socket == null) @@ -97,12 +98,11 @@ namespace MQTTnet.Implementations try { - int totalBytes = 0; + var totalBytes = 0; do { var read = await _dataStream.ReadAsync(buffer, totalBytes, buffer.Length - totalBytes).ConfigureAwait(false); - if (read == 0) { throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting)); diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index 29fcae9..b90b02a 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -17,15 +17,14 @@ namespace MQTTnet.Implementations private int WebSocketBufferSize; private int WebSocketBufferOffset; - public async Task ConnectAsync(MqttClientOptions options) + public Task ConnectAsync(MqttClientOptions options) { _webSocket = null; try { _webSocket = new ClientWebSocket(); - - await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + return _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); } catch (WebSocketException exception) { @@ -33,9 +32,9 @@ namespace MQTTnet.Implementations } } - public async Task DisconnectAsync() + public Task DisconnectAsync() { - await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } public void Dispose() @@ -62,7 +61,7 @@ namespace MQTTnet.Implementations WebSocketReceiveResult response; do { - response = await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None); + response = await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None).ConfigureAwait(false); temporaryBuffer.CopyTo(WebSocketBuffer, offset); offset += response.Count; @@ -72,7 +71,7 @@ namespace MQTTnet.Implementations WebSocketBufferSize = response.Count; if (response.MessageType == WebSocketMessageType.Close) { - await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false); } Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length); diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs index 29fcae9..a710975 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs @@ -17,15 +17,14 @@ namespace MQTTnet.Implementations private int WebSocketBufferSize; private int WebSocketBufferOffset; - public async Task ConnectAsync(MqttClientOptions options) + public Task ConnectAsync(MqttClientOptions options) { _webSocket = null; try { _webSocket = new ClientWebSocket(); - - await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + return _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); } catch (WebSocketException exception) { @@ -33,9 +32,9 @@ namespace MQTTnet.Implementations } } - public async Task DisconnectAsync() + public Task DisconnectAsync() { - await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } public void Dispose() diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index 5042167..d0f1c6e 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -43,11 +43,11 @@ namespace MQTTnet.Core.Adapter MqttBasePacket packet; if (timeout > TimeSpan.Zero) { - packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout); + packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout).ConfigureAwait(false); } else { - packet = await PacketSerializer.DeserializeAsync(_channel); + packet = await PacketSerializer.DeserializeAsync(_channel).ConfigureAwait(false); } if (packet == null) @@ -62,7 +62,7 @@ namespace MQTTnet.Core.Adapter private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) { var timeoutTask = Task.Delay(timeout); - if (await Task.WhenAny(timeoutTask, task) == timeoutTask) + if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask) { throw new MqttCommunicationTimedOutException(); } @@ -78,7 +78,7 @@ namespace MQTTnet.Core.Adapter private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) { var timeoutTask = Task.Delay(timeout); - if (await Task.WhenAny(timeoutTask, task) == timeoutTask) + if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask) { throw new MqttCommunicationTimedOutException(); } diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 518eb37..29b44cf 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -53,7 +53,7 @@ namespace MQTTnet.Core.Client { _disconnectedEventSuspended = false; - await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout); + await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout).ConfigureAwait(false); MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); @@ -73,10 +73,10 @@ namespace MQTTnet.Core.Client StartReceivePackets(); - var response = await SendAndReceiveAsync(connectPacket); + var response = await SendAndReceiveAsync(connectPacket).ConfigureAwait(false); if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); throw new MqttConnectingFailedException(response.ConnectReturnCode); } @@ -92,7 +92,7 @@ namespace MQTTnet.Core.Client } catch (Exception) { - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); throw; } } @@ -101,11 +101,11 @@ namespace MQTTnet.Core.Client { try { - await SendAsync(new MqttDisconnectPacket()); + await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false); } finally { - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); } } @@ -129,7 +129,7 @@ namespace MQTTnet.Core.Client TopicFilters = topicFilters }; - var response = await SendAndReceiveAsync(subscribePacket); + var response = await SendAndReceiveAsync(subscribePacket).ConfigureAwait(false); if (response.SubscribeReturnCodes.Count != topicFilters.Count) { @@ -191,8 +191,8 @@ namespace MQTTnet.Core.Client private async Task PublishExactlyOncePacketAsync(MqttBasePacket publishPacket) { - var pubRecPacket = await SendAndReceiveAsync(publishPacket); - await SendAndReceiveAsync(pubRecPacket.CreateResponse()); + var pubRecPacket = await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); + await SendAndReceiveAsync(pubRecPacket.CreateResponse()).ConfigureAwait(false); } private void ThrowIfNotConnected() @@ -200,15 +200,16 @@ namespace MQTTnet.Core.Client if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); } - private async Task DisconnectInternalAsync() + private Task DisconnectInternalAsync() { try { - await _adapter.DisconnectAsync(); + return _adapter.DisconnectAsync(); } catch (Exception exception) { MqttTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting."); + return Task.FromResult(0); } finally { @@ -335,8 +336,8 @@ namespace MQTTnet.Core.Client return pi1.PacketIdentifier == pi2.PacketIdentifier; } - await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout); - return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout); + await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false); + return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout).ConfigureAwait(false); } private ushort GetNewPacketIdentifier() @@ -352,19 +353,19 @@ namespace MQTTnet.Core.Client { while (!cancellationToken.IsCancellationRequested) { - await Task.Delay(_options.KeepAlivePeriod, cancellationToken); - await SendAndReceiveAsync(new MqttPingReqPacket()); + await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false); + await SendAndReceiveAsync(new MqttPingReqPacket()).ConfigureAwait(false); } } catch (MqttCommunicationException exception) { MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); } catch (Exception exception) { MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets."); - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); } finally { @@ -379,7 +380,7 @@ namespace MQTTnet.Core.Client { while (!cancellationToken.IsCancellationRequested) { - var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero); + var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero).ConfigureAwait(false); MqttTrace.Information(nameof(MqttClient), "Received <<< {0}", packet); StartProcessReceivedPacket(packet, cancellationToken); @@ -388,12 +389,12 @@ namespace MQTTnet.Core.Client catch (MqttCommunicationException exception) { MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); } catch (Exception exception) { MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); } finally { diff --git a/MQTTnet.Core/Client/MqttPacketDispatcher.cs b/MQTTnet.Core/Client/MqttPacketDispatcher.cs index b613f06..0784e8a 100644 --- a/MQTTnet.Core/Client/MqttPacketDispatcher.cs +++ b/MQTTnet.Core/Client/MqttPacketDispatcher.cs @@ -20,7 +20,7 @@ namespace MQTTnet.Core.Client var packetAwaiter = AddPacketAwaiter(selector); DispatchPendingPackets(); - var hasTimeout = await Task.WhenAny(Task.Delay(timeout), packetAwaiter.Task) != packetAwaiter.Task; + var hasTimeout = await Task.WhenAny(Task.Delay(timeout), packetAwaiter.Task).ConfigureAwait(false) != packetAwaiter.Task; RemovePacketAwaiter(packetAwaiter); if (hasTimeout) diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index 6a31fca..f64c0c9 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -28,8 +28,8 @@ namespace MQTTnet.Core.Serializer public async Task ReadToEndAsync() { - await ReadFixedHeaderAsync(); - await ReadRemainingLengthAsync(); + await ReadFixedHeaderAsync().ConfigureAwait(false); + await ReadRemainingLengthAsync().ConfigureAwait(false); if (_remainingLength == 0) { @@ -37,7 +37,7 @@ namespace MQTTnet.Core.Serializer } var buffer = new byte[_remainingLength]; - await ReadFromSourceAsync(buffer); + await ReadFromSourceAsync(buffer).ConfigureAwait(false); _remainingData.Write(buffer, 0, buffer.Length); _remainingData.Position = 0; @@ -91,7 +91,7 @@ namespace MQTTnet.Core.Serializer byte encodedByte; do { - encodedByte = await ReadStreamByteAsync(); + encodedByte = await ReadStreamByteAsync().ConfigureAwait(false); value += (encodedByte & 127) * multiplier; multiplier *= 128; if (multiplier > 128 * 128 * 128) @@ -118,13 +118,13 @@ namespace MQTTnet.Core.Serializer private async Task ReadStreamByteAsync() { var buffer = new byte[1]; - await ReadFromSourceAsync(buffer); + await ReadFromSourceAsync(buffer).ConfigureAwait(false); return buffer[0]; } private async Task ReadFixedHeaderAsync() { - FixedHeader = await ReadStreamByteAsync(); + FixedHeader = await ReadStreamByteAsync().ConfigureAwait(false); var byteReader = new ByteReader(FixedHeader); byteReader.Read(4); diff --git a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs index 38017b6..e2e8f44 100644 --- a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs @@ -457,7 +457,7 @@ namespace MQTTnet.Core.Serializer output.Write(packet.PacketIdentifier); output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02); - await output.WriteToAsync(destination); + await output.WriteToAsync(destination).ConfigureAwait(false); } } diff --git a/MQTTnet.Core/Server/MqttClientMessageQueue.cs b/MQTTnet.Core/Server/MqttClientMessageQueue.cs index 07c6ea5..22fbdbb 100644 --- a/MQTTnet.Core/Server/MqttClientMessageQueue.cs +++ b/MQTTnet.Core/Server/MqttClientMessageQueue.cs @@ -62,7 +62,7 @@ namespace MQTTnet.Core.Server { try { - await _gate.WaitOneAsync(); + await _gate.WaitOneAsync().ConfigureAwait(false); if (cancellationToken.IsCancellationRequested) { return; @@ -81,7 +81,7 @@ namespace MQTTnet.Core.Server foreach (var publishPacket in pendingPublishPackets) { - await TrySendPendingPublishPacketAsync(publishPacket); + await TrySendPendingPublishPacketAsync(publishPacket).ConfigureAwait(false); } } catch (Exception e) @@ -105,7 +105,7 @@ namespace MQTTnet.Core.Server } publishPacketContext.PublishPacket.Dup = publishPacketContext.SendTries > 0; - await _adapter.SendPacketAsync(publishPacketContext.PublishPacket, _options.DefaultCommunicationTimeout); + await _adapter.SendPacketAsync(publishPacketContext.PublishPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false); publishPacketContext.IsSent = true; } diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 0b69425..0dd8c85 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -54,8 +54,8 @@ namespace MQTTnet.Core.Server _messageQueue.Start(adapter); while (!_cancellationTokenSource.IsCancellationRequested) { - var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero); - await HandleIncomingPacketAsync(packet); + var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero).ConfigureAwait(false); + await HandleIncomingPacketAsync(packet).ConfigureAwait(false); } } catch (MqttCommunicationException) diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index 79823e7..b8ff4e6 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -28,7 +28,7 @@ namespace MQTTnet.Core.Server { try { - var connectPacket = await eventArgs.ClientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout) as MqttConnectPacket; + var connectPacket = await eventArgs.ClientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false) as MqttConnectPacket; if (connectPacket == null) { throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); @@ -43,7 +43,7 @@ namespace MQTTnet.Core.Server await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket { ConnectReturnCode = connectReturnCode - }, _options.DefaultCommunicationTimeout); + }, _options.DefaultCommunicationTimeout).ConfigureAwait(false); return; } @@ -54,9 +54,9 @@ namespace MQTTnet.Core.Server { ConnectReturnCode = connectReturnCode, IsSessionPresent = clientSession.IsExistingSession - }, _options.DefaultCommunicationTimeout); + }, _options.DefaultCommunicationTimeout).ConfigureAwait(false); - await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter); + await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter).ConfigureAwait(false); } catch (Exception exception) { @@ -64,7 +64,7 @@ namespace MQTTnet.Core.Server } finally { - await eventArgs.ClientAdapter.DisconnectAsync(); + await eventArgs.ClientAdapter.DisconnectAsync().ConfigureAwait(false); } } diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index c03fa7a..fad1f4e 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -403,24 +403,24 @@ namespace MQTTnet.Core.Tests _stream.Position = 0; } - public async Task ConnectAsync(MqttClientOptions options) + public Task ConnectAsync(MqttClientOptions options) { - await Task.FromResult(0); + return Task.FromResult(0); } - public async Task DisconnectAsync() + public Task DisconnectAsync() { - await Task.FromResult(0); + return Task.FromResult(0); } - public async Task WriteAsync(byte[] buffer) + public Task WriteAsync(byte[] buffer) { - await _stream.WriteAsync(buffer, 0, buffer.Length); + return _stream.WriteAsync(buffer, 0, buffer.Length); } - public async Task ReadAsync(byte[] buffer) + public Task ReadAsync(byte[] buffer) { - await _stream.ReadAsync(buffer, 0, buffer.Length); + return _stream.ReadAsync(buffer, 0, buffer.Length); } public byte[] ToArray() @@ -429,7 +429,7 @@ namespace MQTTnet.Core.Tests } } - private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) + private static void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) { var serializer = new MqttPacketSerializer { ProtocolVersion = protocolVersion }; var channel = new TestChannel(); @@ -439,7 +439,7 @@ namespace MQTTnet.Core.Tests Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(buffer)); } - private void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value) + private static void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value) { var serializer = new MqttPacketSerializer(); diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 8454c76..9ee2290 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -13,36 +13,36 @@ namespace MQTTnet.Core.Tests public class MqttServerTests { [TestMethod] - public async Task MqttServer_PublishSimple_AtMostOnce() + public void MqttServer_PublishSimple_AtMostOnce() { - await TestPublishAsync( + TestPublishAsync( "A/B/C", MqttQualityOfServiceLevel.AtMostOnce, "A/B/C", MqttQualityOfServiceLevel.AtMostOnce, - 1); + 1).Wait(); } [TestMethod] - public async Task MqttServer_PublishSimple_AtLeastOnce() + public void MqttServer_PublishSimple_AtLeastOnce() { - await TestPublishAsync( + TestPublishAsync( "A/B/C", MqttQualityOfServiceLevel.AtLeastOnce, "A/B/C", MqttQualityOfServiceLevel.AtLeastOnce, - 1); + 1).Wait(); } [TestMethod] - public async Task MqttServer_PublishSimple_ExactlyOnce() + public void MqttServer_PublishSimple_ExactlyOnce() { - await TestPublishAsync( + TestPublishAsync( "A/B/C", MqttQualityOfServiceLevel.ExactlyOnce, "A/B/C", MqttQualityOfServiceLevel.ExactlyOnce, - 1); + 1).Wait(); } [TestMethod] diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs index c8c6da6..fe6e448 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs @@ -16,29 +16,29 @@ namespace MQTTnet.Core.Tests public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer(); - public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) + public Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) { - await Task.FromResult(0); + return Task.FromResult(0); } - public async Task DisconnectAsync() + public Task DisconnectAsync() { - await Task.FromResult(0); + return Task.FromResult(0); } - public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) + public Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) { ThrowIfPartnerIsNull(); Partner.SendPacketInternal(packet); - await Task.FromResult(0); + return Task.FromResult(0); } - public async Task ReceivePacketAsync(TimeSpan timeout) + public Task ReceivePacketAsync(TimeSpan timeout) { ThrowIfPartnerIsNull(); - return await Task.Run(() => _incomingPackets.Take()); + return Task.Run(() => _incomingPackets.Take()); } private void SendPacketInternal(MqttBasePacket packet)