diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index 66fc293..0ecf373 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -79,13 +79,13 @@ namespace MQTTnet.Implementations } } - public Task WriteAsync(byte[] buffer) + public async Task WriteAsync(byte[] buffer) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); try { - return _dataStream.WriteAsync(buffer, 0, buffer.Length); + await _dataStream.WriteAsync(buffer, 0, buffer.Length); } catch (SocketException exception) { diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs index 94f1330..925c818 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs @@ -14,14 +14,14 @@ namespace MQTTnet.Implementations private int WebSocketBufferSize; private int WebSocketBufferOffset; - public Task ConnectAsync(MqttClientOptions options) + public async Task ConnectAsync(MqttClientOptions options) { _webSocket = null; try { _webSocket = new ClientWebSocket(); - return _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); } catch (WebSocketException exception) { @@ -75,7 +75,7 @@ namespace MQTTnet.Implementations } } - public Task WriteAsync(byte[] buffer) + public async Task WriteAsync(byte[] buffer) { if (buffer == null) { throw new ArgumentNullException(nameof(buffer)); @@ -83,8 +83,7 @@ namespace MQTTnet.Implementations try { - return _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, - CancellationToken.None); + await _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, CancellationToken.None); } catch (WebSocketException exception) { diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index 87c89b1..bddd79d 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -78,13 +78,13 @@ namespace MQTTnet.Implementations } } - public Task WriteAsync(byte[] buffer) + public async Task WriteAsync(byte[] buffer) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); try { - return _dataStream.WriteAsync(buffer, 0, buffer.Length); + await _dataStream.WriteAsync(buffer, 0, buffer.Length); } catch (SocketException exception) { diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index 2aa31b4..c982ed4 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -14,14 +14,14 @@ namespace MQTTnet.Implementations private int WebSocketBufferSize; private int WebSocketBufferOffset; - public Task ConnectAsync(MqttClientOptions options) + public async Task ConnectAsync(MqttClientOptions options) { _webSocket = null; try { _webSocket = new ClientWebSocket(); - return _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); } catch (WebSocketException exception) { @@ -75,16 +75,16 @@ namespace MQTTnet.Implementations } } - public Task WriteAsync(byte[] buffer) + public async Task WriteAsync(byte[] buffer) { - if (buffer == null) { + if (buffer == null) + { throw new ArgumentNullException(nameof(buffer)); } try { - return _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, - CancellationToken.None); + await _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, CancellationToken.None); } catch (WebSocketException exception) { diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs index f5d2ea8..4f5aa98 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs @@ -87,13 +87,19 @@ namespace MQTTnet.Implementations } } - public async Task ReadAsync(byte[] buffer) + public int Peek() + { + + } + + public async Task> ReadAsync(int length, byte[] buffer) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); try { - await _socket.InputStream.ReadAsync(buffer.AsBuffer(), (uint)buffer.Length, InputStreamOptions.None); + var result = await _socket.InputStream.ReadAsync(buffer.AsBuffer(), (uint)buffer.Length, InputStreamOptions.None); + return new ArraySegment(buffer, 0, (int)result.Length); } catch (SocketException exception) { diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs index a710975..b3d1f95 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs @@ -17,14 +17,14 @@ namespace MQTTnet.Implementations private int WebSocketBufferSize; private int WebSocketBufferOffset; - public Task ConnectAsync(MqttClientOptions options) + public async Task ConnectAsync(MqttClientOptions options) { _webSocket = null; try { _webSocket = new ClientWebSocket(); - return _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); } catch (WebSocketException exception) { @@ -89,16 +89,16 @@ namespace MQTTnet.Implementations } } - public Task WriteAsync(byte[] buffer) + public async Task WriteAsync(byte[] buffer) { - if (buffer == null) { + if (buffer == null) + { throw new ArgumentNullException(nameof(buffer)); } try { - return _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, - CancellationToken.None); + await _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, CancellationToken.None); } catch (WebSocketException exception) { diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 29b44cf..72e6630 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -200,16 +200,15 @@ namespace MQTTnet.Core.Client if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); } - private Task DisconnectInternalAsync() + private async Task DisconnectInternalAsync() { try { - return _adapter.DisconnectAsync(); + await _adapter.DisconnectAsync(); } catch (Exception exception) { MqttTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting."); - return Task.FromResult(0); } finally { @@ -227,28 +226,28 @@ namespace MQTTnet.Core.Client } } - private Task ProcessReceivedPacketAsync(MqttBasePacket mqttPacket) + private async Task ProcessReceivedPacketAsync(MqttBasePacket mqttPacket) { try { if (mqttPacket is MqttPingReqPacket) { - return SendAsync(new MqttPingRespPacket()); + await SendAsync(new MqttPingRespPacket()); } if (mqttPacket is MqttDisconnectPacket) { - return DisconnectAsync(); + await DisconnectAsync(); } if (mqttPacket is MqttPublishPacket publishPacket) { - return ProcessReceivedPublishPacket(publishPacket); + await ProcessReceivedPublishPacket(publishPacket); } if (mqttPacket is MqttPubRelPacket pubRelPacket) { - return ProcessReceivedPubRelPacket(pubRelPacket); + await ProcessReceivedPubRelPacket(pubRelPacket); } _packetDispatcher.Dispatch(mqttPacket); @@ -257,8 +256,6 @@ namespace MQTTnet.Core.Client { MqttTrace.Error(nameof(MqttClient), exception, "Error while processing received packet."); } - - return Task.FromResult(0); } private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) @@ -275,18 +272,17 @@ namespace MQTTnet.Core.Client } } - private Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket) + private async Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket) { if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { FireApplicationMessageReceivedEvent(publishPacket); - return Task.FromResult(0); } if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { FireApplicationMessageReceivedEvent(publishPacket); - return SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); + await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); } if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) @@ -298,7 +294,7 @@ namespace MQTTnet.Core.Client } FireApplicationMessageReceivedEvent(publishPacket); - return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); + await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); } throw new MqttCommunicationException("Received a not supported QoS level."); diff --git a/README.md b/README.md index 4668d6c..6950a4e 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien ## General * Async support * TLS 1.2 support for client and server (but not UWP servers) -* Extensible communication channels (i.e. In-Memory, TCP, TCP+SSL, WebSockets (not included in this project)) +* Extensible communication channels (i.e. In-Memory, TCP, TCP+TLS, WS) * Interfaces included for mocking and testing * Lightweight (only the low level implementation of MQTT, no overhead) * Access to internal trace messages @@ -20,6 +20,7 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien ## Client * Rx support (via another project) +* Communication via TCP (+TLS) or WS (WebSocket) ## Server (broker) * List of connected clients available