Browse Source

Fix awaits in try/catch

release/3.x.x
Christian Kratky 7 years ago
parent
commit
16bc9cb203
8 changed files with 40 additions and 38 deletions
  1. +2
    -2
      Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
  2. +4
    -5
      Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
  3. +2
    -2
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  4. +6
    -6
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  5. +8
    -2
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs
  6. +6
    -6
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs
  7. +10
    -14
      MQTTnet.Core/Client/MqttClient.cs
  8. +2
    -1
      README.md

+ 2
- 2
Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs View File

@@ -79,13 +79,13 @@ namespace MQTTnet.Implementations
} }
} }


public Task WriteAsync(byte[] buffer)
public async Task WriteAsync(byte[] buffer)
{ {
if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (buffer == null) throw new ArgumentNullException(nameof(buffer));


try try
{ {
return _dataStream.WriteAsync(buffer, 0, buffer.Length);
await _dataStream.WriteAsync(buffer, 0, buffer.Length);
} }
catch (SocketException exception) catch (SocketException exception)
{ {


+ 4
- 5
Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs View File

@@ -14,14 +14,14 @@ namespace MQTTnet.Implementations
private int WebSocketBufferSize; private int WebSocketBufferSize;
private int WebSocketBufferOffset; private int WebSocketBufferOffset;


public Task ConnectAsync(MqttClientOptions options)
public async Task ConnectAsync(MqttClientOptions options)
{ {
_webSocket = null; _webSocket = null;


try try
{ {
_webSocket = new ClientWebSocket(); _webSocket = new ClientWebSocket();
return _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
} }
catch (WebSocketException exception) catch (WebSocketException exception)
{ {
@@ -75,7 +75,7 @@ namespace MQTTnet.Implementations
} }
} }


public Task WriteAsync(byte[] buffer)
public async Task WriteAsync(byte[] buffer)
{ {
if (buffer == null) { if (buffer == null) {
throw new ArgumentNullException(nameof(buffer)); throw new ArgumentNullException(nameof(buffer));
@@ -83,8 +83,7 @@ namespace MQTTnet.Implementations


try try
{ {
return _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true,
CancellationToken.None);
await _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true, CancellationToken.None);
} }
catch (WebSocketException exception) catch (WebSocketException exception)
{ {


+ 2
- 2
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs View File

@@ -78,13 +78,13 @@ namespace MQTTnet.Implementations
} }
} }


public Task WriteAsync(byte[] buffer)
public async Task WriteAsync(byte[] buffer)
{ {
if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (buffer == null) throw new ArgumentNullException(nameof(buffer));


try try
{ {
return _dataStream.WriteAsync(buffer, 0, buffer.Length);
await _dataStream.WriteAsync(buffer, 0, buffer.Length);
} }
catch (SocketException exception) catch (SocketException exception)
{ {


+ 6
- 6
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs View File

@@ -14,14 +14,14 @@ namespace MQTTnet.Implementations
private int WebSocketBufferSize; private int WebSocketBufferSize;
private int WebSocketBufferOffset; private int WebSocketBufferOffset;


public Task ConnectAsync(MqttClientOptions options)
public async Task ConnectAsync(MqttClientOptions options)
{ {
_webSocket = null; _webSocket = null;


try try
{ {
_webSocket = new ClientWebSocket(); _webSocket = new ClientWebSocket();
return _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
} }
catch (WebSocketException exception) catch (WebSocketException exception)
{ {
@@ -75,16 +75,16 @@ namespace MQTTnet.Implementations
} }
} }


public Task WriteAsync(byte[] buffer)
public async Task WriteAsync(byte[] buffer)
{ {
if (buffer == null) {
if (buffer == null)
{
throw new ArgumentNullException(nameof(buffer)); throw new ArgumentNullException(nameof(buffer));
} }


try try
{ {
return _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true,
CancellationToken.None);
await _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true, CancellationToken.None);
} }
catch (WebSocketException exception) catch (WebSocketException exception)
{ {


+ 8
- 2
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs View File

@@ -87,13 +87,19 @@ namespace MQTTnet.Implementations
} }
} }


public async Task ReadAsync(byte[] buffer)
public int Peek()
{
}

public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{ {
if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (buffer == null) throw new ArgumentNullException(nameof(buffer));


try try
{ {
await _socket.InputStream.ReadAsync(buffer.AsBuffer(), (uint)buffer.Length, InputStreamOptions.None);
var result = await _socket.InputStream.ReadAsync(buffer.AsBuffer(), (uint)buffer.Length, InputStreamOptions.None);
return new ArraySegment<byte>(buffer, 0, (int)result.Length);
} }
catch (SocketException exception) catch (SocketException exception)
{ {


+ 6
- 6
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs View File

@@ -17,14 +17,14 @@ namespace MQTTnet.Implementations
private int WebSocketBufferSize; private int WebSocketBufferSize;
private int WebSocketBufferOffset; private int WebSocketBufferOffset;


public Task ConnectAsync(MqttClientOptions options)
public async Task ConnectAsync(MqttClientOptions options)
{ {
_webSocket = null; _webSocket = null;


try try
{ {
_webSocket = new ClientWebSocket(); _webSocket = new ClientWebSocket();
return _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None);
} }
catch (WebSocketException exception) catch (WebSocketException exception)
{ {
@@ -89,16 +89,16 @@ namespace MQTTnet.Implementations
} }
} }


public Task WriteAsync(byte[] buffer)
public async Task WriteAsync(byte[] buffer)
{ {
if (buffer == null) {
if (buffer == null)
{
throw new ArgumentNullException(nameof(buffer)); throw new ArgumentNullException(nameof(buffer));
} }


try try
{ {
return _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true,
CancellationToken.None);
await _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true, CancellationToken.None);
} }
catch (WebSocketException exception) catch (WebSocketException exception)
{ {


+ 10
- 14
MQTTnet.Core/Client/MqttClient.cs View File

@@ -200,16 +200,15 @@ namespace MQTTnet.Core.Client
if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
} }


private Task DisconnectInternalAsync()
private async Task DisconnectInternalAsync()
{ {
try try
{ {
return _adapter.DisconnectAsync();
await _adapter.DisconnectAsync();
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting."); MqttTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting.");
return Task.FromResult(0);
} }
finally finally
{ {
@@ -227,28 +226,28 @@ namespace MQTTnet.Core.Client
} }
} }


private Task ProcessReceivedPacketAsync(MqttBasePacket mqttPacket)
private async Task ProcessReceivedPacketAsync(MqttBasePacket mqttPacket)
{ {
try try
{ {
if (mqttPacket is MqttPingReqPacket) if (mqttPacket is MqttPingReqPacket)
{ {
return SendAsync(new MqttPingRespPacket());
await SendAsync(new MqttPingRespPacket());
} }


if (mqttPacket is MqttDisconnectPacket) if (mqttPacket is MqttDisconnectPacket)
{ {
return DisconnectAsync();
await DisconnectAsync();
} }


if (mqttPacket is MqttPublishPacket publishPacket) if (mqttPacket is MqttPublishPacket publishPacket)
{ {
return ProcessReceivedPublishPacket(publishPacket);
await ProcessReceivedPublishPacket(publishPacket);
} }


if (mqttPacket is MqttPubRelPacket pubRelPacket) if (mqttPacket is MqttPubRelPacket pubRelPacket)
{ {
return ProcessReceivedPubRelPacket(pubRelPacket);
await ProcessReceivedPubRelPacket(pubRelPacket);
} }


_packetDispatcher.Dispatch(mqttPacket); _packetDispatcher.Dispatch(mqttPacket);
@@ -257,8 +256,6 @@ namespace MQTTnet.Core.Client
{ {
MqttTrace.Error(nameof(MqttClient), exception, "Error while processing received packet."); MqttTrace.Error(nameof(MqttClient), exception, "Error while processing received packet.");
} }

return Task.FromResult(0);
} }


private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
@@ -275,18 +272,17 @@ namespace MQTTnet.Core.Client
} }
} }


private Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket)
private async Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket)
{ {
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{ {
FireApplicationMessageReceivedEvent(publishPacket); FireApplicationMessageReceivedEvent(publishPacket);
return Task.FromResult(0);
} }


if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{ {
FireApplicationMessageReceivedEvent(publishPacket); FireApplicationMessageReceivedEvent(publishPacket);
return SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
} }


if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
@@ -298,7 +294,7 @@ namespace MQTTnet.Core.Client
} }


FireApplicationMessageReceivedEvent(publishPacket); FireApplicationMessageReceivedEvent(publishPacket);
return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
} }


throw new MqttCommunicationException("Received a not supported QoS level."); throw new MqttCommunicationException("Received a not supported QoS level.");


+ 2
- 1
README.md View File

@@ -12,7 +12,7 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien
## General ## General
* Async support * Async support
* TLS 1.2 support for client and server (but not UWP servers) * TLS 1.2 support for client and server (but not UWP servers)
* Extensible communication channels (i.e. In-Memory, TCP, TCP+SSL, WebSockets (not included in this project))
* Extensible communication channels (i.e. In-Memory, TCP, TCP+TLS, WS)
* Interfaces included for mocking and testing * Interfaces included for mocking and testing
* Lightweight (only the low level implementation of MQTT, no overhead) * Lightweight (only the low level implementation of MQTT, no overhead)
* Access to internal trace messages * Access to internal trace messages
@@ -20,6 +20,7 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien


## Client ## Client
* Rx support (via another project) * Rx support (via another project)
* Communication via TCP (+TLS) or WS (WebSocket)


## Server (broker) ## Server (broker)
* List of connected clients available * List of connected clients available


Loading…
Cancel
Save