Browse Source

Additional performance improvements

release/3.x.x
Christian Kratky 7 years ago
parent
commit
76151deba0
18 changed files with 100 additions and 103 deletions
  1. +5
    -5
      Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs
  2. +2
    -3
      Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
  3. +6
    -7
      Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
  4. +5
    -5
      Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs
  5. +2
    -2
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  6. +6
    -7
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  7. +4
    -5
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs
  8. +4
    -4
      MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs
  9. +21
    -20
      MQTTnet.Core/Client/MqttClient.cs
  10. +1
    -1
      MQTTnet.Core/Client/MqttPacketDispatcher.cs
  11. +6
    -6
      MQTTnet.Core/Serializer/MqttPacketReader.cs
  12. +1
    -1
      MQTTnet.Core/Serializer/MqttPacketSerializer.cs
  13. +3
    -3
      MQTTnet.Core/Server/MqttClientMessageQueue.cs
  14. +2
    -2
      MQTTnet.Core/Server/MqttClientSession.cs
  15. +5
    -5
      MQTTnet.Core/Server/MqttClientSessionsManager.cs
  16. +10
    -10
      Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
  17. +9
    -9
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs
  18. +8
    -8
      Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs

+ 5
- 5
Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs View File

@@ -85,7 +85,7 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null);
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
} }
@@ -94,7 +94,7 @@ namespace MQTTnet.Implementations
MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint."); MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint.");


//excessive CPU consumed if in endless loop of socket errors //excessive CPU consumed if in endless loop of socket errors
Thread.Sleep(TimeSpan.FromSeconds(1));
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
} }
} }
} }
@@ -105,10 +105,10 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null);
var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null).ConfigureAwait(false);


var sslStream = new SslStream(new NetworkStream(clientSocket)); var sslStream = new SslStream(new NetworkStream(clientSocket));
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false);
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
@@ -118,7 +118,7 @@ namespace MQTTnet.Implementations
MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at TLS endpoint."); MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at TLS endpoint.");


//excessive CPU consumed if in endless loop of socket errors //excessive CPU consumed if in endless loop of socket errors
Thread.Sleep(TimeSpan.FromSeconds(1));
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
} }
} }
} }


+ 2
- 3
Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs View File

@@ -46,7 +46,7 @@ namespace MQTTnet.Implementations
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp); _socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
} }


await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null);
await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null).ConfigureAwait(false);


if (options.TlsOptions.UseTls) if (options.TlsOptions.UseTls)
{ {
@@ -99,12 +99,11 @@ namespace MQTTnet.Implementations


try try
{ {
int totalBytes = 0;
var totalBytes = 0;


do do
{ {
var read = await _dataStream.ReadAsync(buffer, totalBytes, buffer.Length - totalBytes).ConfigureAwait(false); var read = await _dataStream.ReadAsync(buffer, totalBytes, buffer.Length - totalBytes).ConfigureAwait(false);

if (read == 0) if (read == 0)
{ {
throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting)); throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting));


+ 6
- 7
Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs View File

@@ -17,15 +17,14 @@ namespace MQTTnet.Implementations
private int WebSocketBufferSize; private int WebSocketBufferSize;
private int WebSocketBufferOffset; private int WebSocketBufferOffset;


public async Task ConnectAsync(MqttClientOptions options)
public Task ConnectAsync(MqttClientOptions options)
{ {
_webSocket = null; _webSocket = null;


try try
{ {
_webSocket = new ClientWebSocket(); _webSocket = new ClientWebSocket();

await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
return _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
} }
catch (WebSocketException exception) catch (WebSocketException exception)
{ {
@@ -33,9 +32,9 @@ namespace MQTTnet.Implementations
} }
} }


public async Task DisconnectAsync()
public Task DisconnectAsync()
{ {
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
} }


public void Dispose() public void Dispose()
@@ -62,7 +61,7 @@ namespace MQTTnet.Implementations
WebSocketReceiveResult response; WebSocketReceiveResult response;
do do
{ {
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None);
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None).ConfigureAwait(false);


temporaryBuffer.CopyTo(WebSocketBuffer, offset); temporaryBuffer.CopyTo(WebSocketBuffer, offset);
offset += response.Count; offset += response.Count;
@@ -72,7 +71,7 @@ namespace MQTTnet.Implementations
WebSocketBufferSize = response.Count; WebSocketBufferSize = response.Count;
if (response.MessageType == WebSocketMessageType.Close) if (response.MessageType == WebSocketMessageType.Close)
{ {
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
} }


Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length); Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length);


+ 5
- 5
Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs View File

@@ -83,7 +83,7 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
var clientSocket = await _defaultEndpointSocket.AcceptAsync();
var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
} }
@@ -92,7 +92,7 @@ namespace MQTTnet.Implementations
MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint."); MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at default endpoint.");


//excessive CPU consumed if in endless loop of socket errors //excessive CPU consumed if in endless loop of socket errors
Thread.Sleep(TimeSpan.FromSeconds(1));
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
} }
} }
} }
@@ -103,10 +103,10 @@ namespace MQTTnet.Implementations
{ {
try try
{ {
var clientSocket = await _tlsEndpointSocket.AcceptAsync();
var clientSocket = await _tlsEndpointSocket.AcceptAsync().ConfigureAwait(false);


var sslStream = new SslStream(new NetworkStream(clientSocket)); var sslStream = new SslStream(new NetworkStream(clientSocket));
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false);
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
@@ -116,7 +116,7 @@ namespace MQTTnet.Implementations
MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at TLS endpoint."); MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while accepting connection at TLS endpoint.");


//excessive CPU consumed if in endless loop of socket errors //excessive CPU consumed if in endless loop of socket errors
Thread.Sleep(TimeSpan.FromSeconds(1));
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
} }
} }
} }


+ 2
- 2
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs View File

@@ -38,6 +38,7 @@ namespace MQTTnet.Implementations
public async Task ConnectAsync(MqttClientOptions options) public async Task ConnectAsync(MqttClientOptions options)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));

try try
{ {
if (_socket == null) if (_socket == null)
@@ -97,12 +98,11 @@ namespace MQTTnet.Implementations


try try
{ {
int totalBytes = 0;
var totalBytes = 0;


do do
{ {
var read = await _dataStream.ReadAsync(buffer, totalBytes, buffer.Length - totalBytes).ConfigureAwait(false); var read = await _dataStream.ReadAsync(buffer, totalBytes, buffer.Length - totalBytes).ConfigureAwait(false);

if (read == 0) if (read == 0)
{ {
throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting)); throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting));


+ 6
- 7
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs View File

@@ -17,15 +17,14 @@ namespace MQTTnet.Implementations
private int WebSocketBufferSize; private int WebSocketBufferSize;
private int WebSocketBufferOffset; private int WebSocketBufferOffset;


public async Task ConnectAsync(MqttClientOptions options)
public Task ConnectAsync(MqttClientOptions options)
{ {
_webSocket = null; _webSocket = null;


try try
{ {
_webSocket = new ClientWebSocket(); _webSocket = new ClientWebSocket();

await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
return _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
} }
catch (WebSocketException exception) catch (WebSocketException exception)
{ {
@@ -33,9 +32,9 @@ namespace MQTTnet.Implementations
} }
} }


public async Task DisconnectAsync()
public Task DisconnectAsync()
{ {
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
} }


public void Dispose() public void Dispose()
@@ -62,7 +61,7 @@ namespace MQTTnet.Implementations
WebSocketReceiveResult response; WebSocketReceiveResult response;
do do
{ {
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None);
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None).ConfigureAwait(false);


temporaryBuffer.CopyTo(WebSocketBuffer, offset); temporaryBuffer.CopyTo(WebSocketBuffer, offset);
offset += response.Count; offset += response.Count;
@@ -72,7 +71,7 @@ namespace MQTTnet.Implementations
WebSocketBufferSize = response.Count; WebSocketBufferSize = response.Count;
if (response.MessageType == WebSocketMessageType.Close) if (response.MessageType == WebSocketMessageType.Close)
{ {
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
} }


Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length); Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length);


+ 4
- 5
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs View File

@@ -17,15 +17,14 @@ namespace MQTTnet.Implementations
private int WebSocketBufferSize; private int WebSocketBufferSize;
private int WebSocketBufferOffset; private int WebSocketBufferOffset;


public async Task ConnectAsync(MqttClientOptions options)
public Task ConnectAsync(MqttClientOptions options)
{ {
_webSocket = null; _webSocket = null;


try try
{ {
_webSocket = new ClientWebSocket(); _webSocket = new ClientWebSocket();

await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
return _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
} }
catch (WebSocketException exception) catch (WebSocketException exception)
{ {
@@ -33,9 +32,9 @@ namespace MQTTnet.Implementations
} }
} }


public async Task DisconnectAsync()
public Task DisconnectAsync()
{ {
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
} }


public void Dispose() public void Dispose()


+ 4
- 4
MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs View File

@@ -43,11 +43,11 @@ namespace MQTTnet.Core.Adapter
MqttBasePacket packet; MqttBasePacket packet;
if (timeout > TimeSpan.Zero) if (timeout > TimeSpan.Zero)
{ {
packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout);
packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout).ConfigureAwait(false);
} }
else else
{ {
packet = await PacketSerializer.DeserializeAsync(_channel);
packet = await PacketSerializer.DeserializeAsync(_channel).ConfigureAwait(false);
} }


if (packet == null) if (packet == null)
@@ -62,7 +62,7 @@ namespace MQTTnet.Core.Adapter
private static async Task<TResult> ExecuteWithTimeoutAsync<TResult>(Task<TResult> task, TimeSpan timeout) private static async Task<TResult> ExecuteWithTimeoutAsync<TResult>(Task<TResult> task, TimeSpan timeout)
{ {
var timeoutTask = Task.Delay(timeout); var timeoutTask = Task.Delay(timeout);
if (await Task.WhenAny(timeoutTask, task) == timeoutTask)
if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask)
{ {
throw new MqttCommunicationTimedOutException(); throw new MqttCommunicationTimedOutException();
} }
@@ -78,7 +78,7 @@ namespace MQTTnet.Core.Adapter
private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout)
{ {
var timeoutTask = Task.Delay(timeout); var timeoutTask = Task.Delay(timeout);
if (await Task.WhenAny(timeoutTask, task) == timeoutTask)
if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask)
{ {
throw new MqttCommunicationTimedOutException(); throw new MqttCommunicationTimedOutException();
} }


+ 21
- 20
MQTTnet.Core/Client/MqttClient.cs View File

@@ -53,7 +53,7 @@ namespace MQTTnet.Core.Client
{ {
_disconnectedEventSuspended = false; _disconnectedEventSuspended = false;


await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout);
await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout).ConfigureAwait(false);


MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); MqttTrace.Verbose(nameof(MqttClient), "Connection with server established.");


@@ -73,10 +73,10 @@ namespace MQTTnet.Core.Client


StartReceivePackets(); StartReceivePackets();


var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket);
var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket).ConfigureAwait(false);
if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{ {
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
throw new MqttConnectingFailedException(response.ConnectReturnCode); throw new MqttConnectingFailedException(response.ConnectReturnCode);
} }


@@ -92,7 +92,7 @@ namespace MQTTnet.Core.Client
} }
catch (Exception) catch (Exception)
{ {
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
throw; throw;
} }
} }
@@ -101,11 +101,11 @@ namespace MQTTnet.Core.Client
{ {
try try
{ {
await SendAsync(new MqttDisconnectPacket());
await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false);
} }
finally finally
{ {
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
} }
} }


@@ -129,7 +129,7 @@ namespace MQTTnet.Core.Client
TopicFilters = topicFilters TopicFilters = topicFilters
}; };


var response = await SendAndReceiveAsync<MqttSubAckPacket>(subscribePacket);
var response = await SendAndReceiveAsync<MqttSubAckPacket>(subscribePacket).ConfigureAwait(false);


if (response.SubscribeReturnCodes.Count != topicFilters.Count) if (response.SubscribeReturnCodes.Count != topicFilters.Count)
{ {
@@ -191,8 +191,8 @@ namespace MQTTnet.Core.Client


private async Task PublishExactlyOncePacketAsync(MqttBasePacket publishPacket) private async Task PublishExactlyOncePacketAsync(MqttBasePacket publishPacket)
{ {
var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket);
await SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>());
var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket).ConfigureAwait(false);
await SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>()).ConfigureAwait(false);
} }


private void ThrowIfNotConnected() private void ThrowIfNotConnected()
@@ -200,15 +200,16 @@ namespace MQTTnet.Core.Client
if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
} }


private async Task DisconnectInternalAsync()
private Task DisconnectInternalAsync()
{ {
try try
{ {
await _adapter.DisconnectAsync();
return _adapter.DisconnectAsync();
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting."); MqttTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting.");
return Task.FromResult(0);
} }
finally finally
{ {
@@ -335,8 +336,8 @@ namespace MQTTnet.Core.Client
return pi1.PacketIdentifier == pi2.PacketIdentifier; return pi1.PacketIdentifier == pi2.PacketIdentifier;
} }


await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout);
return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout);
await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false);
return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout).ConfigureAwait(false);
} }


private ushort GetNewPacketIdentifier() private ushort GetNewPacketIdentifier()
@@ -352,19 +353,19 @@ namespace MQTTnet.Core.Client
{ {
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
await Task.Delay(_options.KeepAlivePeriod, cancellationToken);
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket());
await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false);
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()).ConfigureAwait(false);
} }
} }
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets.");
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets."); MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets.");
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
} }
finally finally
{ {
@@ -379,7 +380,7 @@ namespace MQTTnet.Core.Client
{ {
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero);
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero).ConfigureAwait(false);
MqttTrace.Information(nameof(MqttClient), "Received <<< {0}", packet); MqttTrace.Information(nameof(MqttClient), "Received <<< {0}", packet);


StartProcessReceivedPacket(packet, cancellationToken); StartProcessReceivedPacket(packet, cancellationToken);
@@ -388,12 +389,12 @@ namespace MQTTnet.Core.Client
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets.");
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets.");
await DisconnectInternalAsync();
await DisconnectInternalAsync().ConfigureAwait(false);
} }
finally finally
{ {


+ 1
- 1
MQTTnet.Core/Client/MqttPacketDispatcher.cs View File

@@ -20,7 +20,7 @@ namespace MQTTnet.Core.Client
var packetAwaiter = AddPacketAwaiter(selector); var packetAwaiter = AddPacketAwaiter(selector);
DispatchPendingPackets(); DispatchPendingPackets();


var hasTimeout = await Task.WhenAny(Task.Delay(timeout), packetAwaiter.Task) != packetAwaiter.Task;
var hasTimeout = await Task.WhenAny(Task.Delay(timeout), packetAwaiter.Task).ConfigureAwait(false) != packetAwaiter.Task;
RemovePacketAwaiter(packetAwaiter); RemovePacketAwaiter(packetAwaiter);


if (hasTimeout) if (hasTimeout)


+ 6
- 6
MQTTnet.Core/Serializer/MqttPacketReader.cs View File

@@ -28,8 +28,8 @@ namespace MQTTnet.Core.Serializer


public async Task ReadToEndAsync() public async Task ReadToEndAsync()
{ {
await ReadFixedHeaderAsync();
await ReadRemainingLengthAsync();
await ReadFixedHeaderAsync().ConfigureAwait(false);
await ReadRemainingLengthAsync().ConfigureAwait(false);


if (_remainingLength == 0) if (_remainingLength == 0)
{ {
@@ -37,7 +37,7 @@ namespace MQTTnet.Core.Serializer
} }


var buffer = new byte[_remainingLength]; var buffer = new byte[_remainingLength];
await ReadFromSourceAsync(buffer);
await ReadFromSourceAsync(buffer).ConfigureAwait(false);
_remainingData.Write(buffer, 0, buffer.Length); _remainingData.Write(buffer, 0, buffer.Length);
_remainingData.Position = 0; _remainingData.Position = 0;
@@ -91,7 +91,7 @@ namespace MQTTnet.Core.Serializer
byte encodedByte; byte encodedByte;
do do
{ {
encodedByte = await ReadStreamByteAsync();
encodedByte = await ReadStreamByteAsync().ConfigureAwait(false);
value += (encodedByte & 127) * multiplier; value += (encodedByte & 127) * multiplier;
multiplier *= 128; multiplier *= 128;
if (multiplier > 128 * 128 * 128) if (multiplier > 128 * 128 * 128)
@@ -118,13 +118,13 @@ namespace MQTTnet.Core.Serializer
private async Task<byte> ReadStreamByteAsync() private async Task<byte> ReadStreamByteAsync()
{ {
var buffer = new byte[1]; var buffer = new byte[1];
await ReadFromSourceAsync(buffer);
await ReadFromSourceAsync(buffer).ConfigureAwait(false);
return buffer[0]; return buffer[0];
} }


private async Task ReadFixedHeaderAsync() private async Task ReadFixedHeaderAsync()
{ {
FixedHeader = await ReadStreamByteAsync();
FixedHeader = await ReadStreamByteAsync().ConfigureAwait(false);


var byteReader = new ByteReader(FixedHeader); var byteReader = new ByteReader(FixedHeader);
byteReader.Read(4); byteReader.Read(4);


+ 1
- 1
MQTTnet.Core/Serializer/MqttPacketSerializer.cs View File

@@ -457,7 +457,7 @@ namespace MQTTnet.Core.Serializer
output.Write(packet.PacketIdentifier); output.Write(packet.PacketIdentifier);


output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02); output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02);
await output.WriteToAsync(destination);
await output.WriteToAsync(destination).ConfigureAwait(false);
} }
} }




+ 3
- 3
MQTTnet.Core/Server/MqttClientMessageQueue.cs View File

@@ -62,7 +62,7 @@ namespace MQTTnet.Core.Server
{ {
try try
{ {
await _gate.WaitOneAsync();
await _gate.WaitOneAsync().ConfigureAwait(false);
if (cancellationToken.IsCancellationRequested) if (cancellationToken.IsCancellationRequested)
{ {
return; return;
@@ -81,7 +81,7 @@ namespace MQTTnet.Core.Server


foreach (var publishPacket in pendingPublishPackets) foreach (var publishPacket in pendingPublishPackets)
{ {
await TrySendPendingPublishPacketAsync(publishPacket);
await TrySendPendingPublishPacketAsync(publishPacket).ConfigureAwait(false);
} }
} }
catch (Exception e) catch (Exception e)
@@ -105,7 +105,7 @@ namespace MQTTnet.Core.Server
} }


publishPacketContext.PublishPacket.Dup = publishPacketContext.SendTries > 0; publishPacketContext.PublishPacket.Dup = publishPacketContext.SendTries > 0;
await _adapter.SendPacketAsync(publishPacketContext.PublishPacket, _options.DefaultCommunicationTimeout);
await _adapter.SendPacketAsync(publishPacketContext.PublishPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false);


publishPacketContext.IsSent = true; publishPacketContext.IsSent = true;
} }


+ 2
- 2
MQTTnet.Core/Server/MqttClientSession.cs View File

@@ -54,8 +54,8 @@ namespace MQTTnet.Core.Server
_messageQueue.Start(adapter); _messageQueue.Start(adapter);
while (!_cancellationTokenSource.IsCancellationRequested) while (!_cancellationTokenSource.IsCancellationRequested)
{ {
var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero);
await HandleIncomingPacketAsync(packet);
var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero).ConfigureAwait(false);
await HandleIncomingPacketAsync(packet).ConfigureAwait(false);
} }
} }
catch (MqttCommunicationException) catch (MqttCommunicationException)


+ 5
- 5
MQTTnet.Core/Server/MqttClientSessionsManager.cs View File

@@ -28,7 +28,7 @@ namespace MQTTnet.Core.Server
{ {
try try
{ {
var connectPacket = await eventArgs.ClientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout) as MqttConnectPacket;
var connectPacket = await eventArgs.ClientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false) as MqttConnectPacket;
if (connectPacket == null) if (connectPacket == null)
{ {
throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
@@ -43,7 +43,7 @@ namespace MQTTnet.Core.Server
await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket
{ {
ConnectReturnCode = connectReturnCode ConnectReturnCode = connectReturnCode
}, _options.DefaultCommunicationTimeout);
}, _options.DefaultCommunicationTimeout).ConfigureAwait(false);


return; return;
} }
@@ -54,9 +54,9 @@ namespace MQTTnet.Core.Server
{ {
ConnectReturnCode = connectReturnCode, ConnectReturnCode = connectReturnCode,
IsSessionPresent = clientSession.IsExistingSession IsSessionPresent = clientSession.IsExistingSession
}, _options.DefaultCommunicationTimeout);
}, _options.DefaultCommunicationTimeout).ConfigureAwait(false);


await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter);
await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter).ConfigureAwait(false);
} }
catch (Exception exception) catch (Exception exception)
{ {
@@ -64,7 +64,7 @@ namespace MQTTnet.Core.Server
} }
finally finally
{ {
await eventArgs.ClientAdapter.DisconnectAsync();
await eventArgs.ClientAdapter.DisconnectAsync().ConfigureAwait(false);
} }
} }




+ 10
- 10
Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs View File

@@ -403,24 +403,24 @@ namespace MQTTnet.Core.Tests
_stream.Position = 0; _stream.Position = 0;
} }


public async Task ConnectAsync(MqttClientOptions options)
public Task ConnectAsync(MqttClientOptions options)
{ {
await Task.FromResult(0);
return Task.FromResult(0);
} }


public async Task DisconnectAsync()
public Task DisconnectAsync()
{ {
await Task.FromResult(0);
return Task.FromResult(0);
} }


public async Task WriteAsync(byte[] buffer)
public Task WriteAsync(byte[] buffer)
{ {
await _stream.WriteAsync(buffer, 0, buffer.Length);
return _stream.WriteAsync(buffer, 0, buffer.Length);
} }


public async Task ReadAsync(byte[] buffer)
public Task ReadAsync(byte[] buffer)
{ {
await _stream.ReadAsync(buffer, 0, buffer.Length);
return _stream.ReadAsync(buffer, 0, buffer.Length);
} }


public byte[] ToArray() public byte[] ToArray()
@@ -429,7 +429,7 @@ namespace MQTTnet.Core.Tests
} }
} }


private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
private static void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)
{ {
var serializer = new MqttPacketSerializer { ProtocolVersion = protocolVersion }; var serializer = new MqttPacketSerializer { ProtocolVersion = protocolVersion };
var channel = new TestChannel(); var channel = new TestChannel();
@@ -439,7 +439,7 @@ namespace MQTTnet.Core.Tests
Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(buffer)); Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(buffer));
} }


private void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value)
private static void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value)
{ {
var serializer = new MqttPacketSerializer(); var serializer = new MqttPacketSerializer();




+ 9
- 9
Tests/MQTTnet.Core.Tests/MqttServerTests.cs View File

@@ -13,36 +13,36 @@ namespace MQTTnet.Core.Tests
public class MqttServerTests public class MqttServerTests
{ {
[TestMethod] [TestMethod]
public async Task MqttServer_PublishSimple_AtMostOnce()
public void MqttServer_PublishSimple_AtMostOnce()
{ {
await TestPublishAsync(
TestPublishAsync(
"A/B/C", "A/B/C",
MqttQualityOfServiceLevel.AtMostOnce, MqttQualityOfServiceLevel.AtMostOnce,
"A/B/C", "A/B/C",
MqttQualityOfServiceLevel.AtMostOnce, MqttQualityOfServiceLevel.AtMostOnce,
1);
1).Wait();
} }


[TestMethod] [TestMethod]
public async Task MqttServer_PublishSimple_AtLeastOnce()
public void MqttServer_PublishSimple_AtLeastOnce()
{ {
await TestPublishAsync(
TestPublishAsync(
"A/B/C", "A/B/C",
MqttQualityOfServiceLevel.AtLeastOnce, MqttQualityOfServiceLevel.AtLeastOnce,
"A/B/C", "A/B/C",
MqttQualityOfServiceLevel.AtLeastOnce, MqttQualityOfServiceLevel.AtLeastOnce,
1);
1).Wait();
} }


[TestMethod] [TestMethod]
public async Task MqttServer_PublishSimple_ExactlyOnce()
public void MqttServer_PublishSimple_ExactlyOnce()
{ {
await TestPublishAsync(
TestPublishAsync(
"A/B/C", "A/B/C",
MqttQualityOfServiceLevel.ExactlyOnce, MqttQualityOfServiceLevel.ExactlyOnce,
"A/B/C", "A/B/C",
MqttQualityOfServiceLevel.ExactlyOnce, MqttQualityOfServiceLevel.ExactlyOnce,
1);
1).Wait();
} }


[TestMethod] [TestMethod]


+ 8
- 8
Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs View File

@@ -16,29 +16,29 @@ namespace MQTTnet.Core.Tests


public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer(); public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer();


public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
public Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
{ {
await Task.FromResult(0);
return Task.FromResult(0);
} }


public async Task DisconnectAsync()
public Task DisconnectAsync()
{ {
await Task.FromResult(0);
return Task.FromResult(0);
} }


public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout)
public Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout)
{ {
ThrowIfPartnerIsNull(); ThrowIfPartnerIsNull();


Partner.SendPacketInternal(packet); Partner.SendPacketInternal(packet);
await Task.FromResult(0);
return Task.FromResult(0);
} }


public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout)
public Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout)
{ {
ThrowIfPartnerIsNull(); ThrowIfPartnerIsNull();


return await Task.Run(() => _incomingPackets.Take());
return Task.Run(() => _incomingPackets.Take());
} }


private void SendPacketInternal(MqttBasePacket packet) private void SendPacketInternal(MqttBasePacket packet)


Loading…
Cancel
Save