From 1c30d149108d99362403eb2f26826e48d52969cc Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 9 Sep 2017 23:13:53 +0200 Subject: [PATCH] Refactoring of WebSocket support --- Build/MQTTnet.nuspec | 3 +- .../Implementations/MqttWebSocketChannel.cs} | 17 +-- .../MQTTnet.NetFramework.csproj | 1 + .../MQTTnet.NetFramework/MqttClientFactory.cs | 19 ++- .../Implementations/MqttWebSocketChannel.cs | 110 ++++++++++++++++++ .../MQTTnet.NetStandard/MqttClientFactory.cs | 12 +- .../Implementations/MqttWebSocketChannel.cs | 110 ++++++++++++++++++ .../MQTTnet.UniversalWindows.csproj | 1 + .../MqttClientFactory.cs | 19 ++- MQTTnet.Core/Client/ConnectionType.cs | 14 --- MQTTnet.Core/Client/MqttClientOptions.cs | 2 +- MQTTnet.Core/Client/MqttConnectionType.cs | 10 ++ MQTTnet.TestApp.NetCore/Program.cs | 2 +- 13 files changed, 282 insertions(+), 38 deletions(-) rename Frameworks/{MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs => MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs} (87%) create mode 100644 Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs create mode 100644 Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs delete mode 100644 MQTTnet.Core/Client/ConnectionType.cs create mode 100644 MQTTnet.Core/Client/MqttConnectionType.cs diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 6da1a6f..78c44f9 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -10,7 +10,8 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * + * [Client] Added support for web socket communication channel (thanks to nowakpiotr) +* [Core] Performance optimizations (thanks to JanEggers) Copyright Christian Kratky 2016-2017 MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware Arduino diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs similarity index 87% rename from Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs rename to Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs index 8d51812..29fcae9 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs @@ -8,20 +8,15 @@ using System.Threading.Tasks; namespace MQTTnet.Implementations { - public sealed class MqttWebSocketsChannel : IMqttCommunicationChannel, IDisposable + public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { - private ClientWebSocket _webSocket; + private ClientWebSocket _webSocket = new ClientWebSocket(); private const int BufferSize = 4096; private const int BufferAmplifier = 20; private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier]; private int WebSocketBufferSize; private int WebSocketBufferOffset; - public MqttWebSocketsChannel() - { - _webSocket = new ClientWebSocket(); - } - public async Task ConnectAsync(MqttClientOptions options) { _webSocket = null; @@ -45,10 +40,7 @@ namespace MQTTnet.Implementations public void Dispose() { - if (_webSocket != null) - { - _webSocket.Dispose(); - } + _webSocket?.Dispose(); } public Task ReadAsync(byte[] buffer) @@ -70,8 +62,7 @@ namespace MQTTnet.Implementations WebSocketReceiveResult response; do { - response = - await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None); + response = await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None); temporaryBuffer.CopyTo(WebSocketBuffer, offset); offset += response.Count; diff --git a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj index 7f4ab44..7578afe 100644 --- a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj +++ b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj @@ -100,6 +100,7 @@ + diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs index 7dd7500..8075f92 100644 --- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs @@ -1,5 +1,6 @@ using System; using MQTTnet.Core.Adapter; +using MQTTnet.Core.Channel; using MQTTnet.Core.Client; using MQTTnet.Core.Serializer; using MQTTnet.Implementations; @@ -12,7 +13,23 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer())); + } + + private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options) + { + switch (options.ConnectionType) + { + case MqttConnectionType.Tcp: + case MqttConnectionType.Tls: + return new MqttTcpChannel(); + case MqttConnectionType.Ws: + case MqttConnectionType.Wss: + return new MqttWebSocketChannel(); + + default: + throw new NotSupportedException(); + } } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs new file mode 100644 index 0000000..29fcae9 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -0,0 +1,110 @@ +using MQTTnet.Core.Channel; +using MQTTnet.Core.Client; +using MQTTnet.Core.Exceptions; +using System; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Implementations +{ + public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable + { + private ClientWebSocket _webSocket = new ClientWebSocket(); + private const int BufferSize = 4096; + private const int BufferAmplifier = 20; + private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier]; + private int WebSocketBufferSize; + private int WebSocketBufferOffset; + + public async Task ConnectAsync(MqttClientOptions options) + { + _webSocket = null; + + try + { + _webSocket = new ClientWebSocket(); + + await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + } + catch (WebSocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public async Task DisconnectAsync() + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + } + + public void Dispose() + { + _webSocket?.Dispose(); + } + + public Task ReadAsync(byte[] buffer) + { + return Task.WhenAll(ReadToBufferAsync(buffer)); + } + + private async Task ReadToBufferAsync(byte[] buffer) + { + var temporaryBuffer = new byte[BufferSize]; + var offset = 0; + + while (_webSocket.State == WebSocketState.Open) + { + if (WebSocketBufferSize == 0) + { + WebSocketBufferOffset = 0; + + WebSocketReceiveResult response; + do + { + response = await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None); + + temporaryBuffer.CopyTo(WebSocketBuffer, offset); + offset += response.Count; + temporaryBuffer = new byte[BufferSize]; + } while (!response.EndOfMessage); + + WebSocketBufferSize = response.Count; + if (response.MessageType == WebSocketMessageType.Close) + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + } + + Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length); + WebSocketBufferSize -= buffer.Length; + WebSocketBufferOffset += buffer.Length; + } + else + { + Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length); + WebSocketBufferSize -= buffer.Length; + WebSocketBufferOffset += buffer.Length; + } + + return; + } + } + + public Task WriteAsync(byte[] buffer) + { + if (buffer == null) { + throw new ArgumentNullException(nameof(buffer)); + } + + try + { + return _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, + CancellationToken.None); + } + catch (WebSocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 1cd6d6f..5857075 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -22,15 +22,15 @@ namespace MQTTnet { switch (options.ConnectionType) { - case ConnectionTypes.TCP: - case ConnectionTypes.TLS: + case MqttConnectionType.Tcp: + case MqttConnectionType.Tls: return new MqttTcpChannel(); - case ConnectionTypes.WS: - case ConnectionTypes.WSS: - return new MqttWebSocketsChannel(); + case MqttConnectionType.Ws: + case MqttConnectionType.Wss: + return new MqttWebSocketChannel(); default: - return null; + throw new NotSupportedException(); } } } diff --git a/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs new file mode 100644 index 0000000..29fcae9 --- /dev/null +++ b/Frameworks/MQTTnet.UniversalWindows/Implementations/MqttWebSocketChannel.cs @@ -0,0 +1,110 @@ +using MQTTnet.Core.Channel; +using MQTTnet.Core.Client; +using MQTTnet.Core.Exceptions; +using System; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Implementations +{ + public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable + { + private ClientWebSocket _webSocket = new ClientWebSocket(); + private const int BufferSize = 4096; + private const int BufferAmplifier = 20; + private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier]; + private int WebSocketBufferSize; + private int WebSocketBufferOffset; + + public async Task ConnectAsync(MqttClientOptions options) + { + _webSocket = null; + + try + { + _webSocket = new ClientWebSocket(); + + await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + } + catch (WebSocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public async Task DisconnectAsync() + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + } + + public void Dispose() + { + _webSocket?.Dispose(); + } + + public Task ReadAsync(byte[] buffer) + { + return Task.WhenAll(ReadToBufferAsync(buffer)); + } + + private async Task ReadToBufferAsync(byte[] buffer) + { + var temporaryBuffer = new byte[BufferSize]; + var offset = 0; + + while (_webSocket.State == WebSocketState.Open) + { + if (WebSocketBufferSize == 0) + { + WebSocketBufferOffset = 0; + + WebSocketReceiveResult response; + do + { + response = await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None); + + temporaryBuffer.CopyTo(WebSocketBuffer, offset); + offset += response.Count; + temporaryBuffer = new byte[BufferSize]; + } while (!response.EndOfMessage); + + WebSocketBufferSize = response.Count; + if (response.MessageType == WebSocketMessageType.Close) + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + } + + Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length); + WebSocketBufferSize -= buffer.Length; + WebSocketBufferOffset += buffer.Length; + } + else + { + Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length); + WebSocketBufferSize -= buffer.Length; + WebSocketBufferOffset += buffer.Length; + } + + return; + } + } + + public Task WriteAsync(byte[] buffer) + { + if (buffer == null) { + throw new ArgumentNullException(nameof(buffer)); + } + + try + { + return _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, + CancellationToken.None); + } + catch (WebSocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj b/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj index 791ee28..3dd8a2d 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj +++ b/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj @@ -110,6 +110,7 @@ bin\x64\Release\MQTTnet.XML + diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs index 7dd7500..8075f92 100644 --- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -1,5 +1,6 @@ using System; using MQTTnet.Core.Adapter; +using MQTTnet.Core.Channel; using MQTTnet.Core.Client; using MQTTnet.Core.Serializer; using MQTTnet.Implementations; @@ -12,7 +13,23 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer())); + } + + private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options) + { + switch (options.ConnectionType) + { + case MqttConnectionType.Tcp: + case MqttConnectionType.Tls: + return new MqttTcpChannel(); + case MqttConnectionType.Ws: + case MqttConnectionType.Wss: + return new MqttWebSocketChannel(); + + default: + throw new NotSupportedException(); + } } } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/ConnectionType.cs b/MQTTnet.Core/Client/ConnectionType.cs deleted file mode 100644 index 1c6abad..0000000 --- a/MQTTnet.Core/Client/ConnectionType.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace MQTTnet.Core.Client -{ - public enum ConnectionTypes - { - TCP, - TLS, - WS, - WSS - } -} diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs index 8b65299..12e7ae9 100644 --- a/MQTTnet.Core/Client/MqttClientOptions.cs +++ b/MQTTnet.Core/Client/MqttClientOptions.cs @@ -25,6 +25,6 @@ namespace MQTTnet.Core.Client public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; - public ConnectionTypes ConnectionType { get; set; } = ConnectionTypes.TCP; + public MqttConnectionType ConnectionType { get; set; } = MqttConnectionType.Tcp; } } diff --git a/MQTTnet.Core/Client/MqttConnectionType.cs b/MQTTnet.Core/Client/MqttConnectionType.cs new file mode 100644 index 0000000..636054d --- /dev/null +++ b/MQTTnet.Core/Client/MqttConnectionType.cs @@ -0,0 +1,10 @@ +namespace MQTTnet.Core.Client +{ + public enum MqttConnectionType + { + Tcp, + Tls, + Ws, + Wss + } +} diff --git a/MQTTnet.TestApp.NetCore/Program.cs b/MQTTnet.TestApp.NetCore/Program.cs index 4644471..e5470d9 100644 --- a/MQTTnet.TestApp.NetCore/Program.cs +++ b/MQTTnet.TestApp.NetCore/Program.cs @@ -51,7 +51,7 @@ namespace MQTTnet.TestApp.NetCore Server = "localhost", ClientId = "XYZ", CleanSession = true, - ConnectionType = ConnectionTypes.WS + ConnectionType = MqttConnectionType.Ws }; var client = new MqttClientFactory().CreateMqttClient(options);