From 3a1e32f5aa2b8532762d786668af22dfe29ca9b3 Mon Sep 17 00:00:00 2001 From: "IMPAQ\\pnow" Date: Fri, 1 Sep 2017 09:39:59 +0200 Subject: [PATCH 1/2] Added support for websockets --- .../Implementations/MqttWebSocketsChannel.cs | 119 ++++++++++++ .../MQTTnet.Netstandard.csproj | 2 + .../MQTTnet.NetStandard/MqttClientFactory.cs | 19 +- MQTTnet.Core/Client/ConnectionType.cs | 14 ++ MQTTnet.Core/Client/MqttClientOptions.cs | 2 + .../MQTTnet.TestApp.NetCore.csproj | 13 ++ MQTTnet.TestApp.NetCore/Program.cs | 171 ++++++++++++++++++ MQTTnet.sln | 21 ++- .../MQTTnet.TestApp.NetFramework.csproj | 2 +- Tests/MQTTnet.TestApp.NetFramework/Program.cs | 1 + 10 files changed, 361 insertions(+), 3 deletions(-) create mode 100644 Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs create mode 100644 MQTTnet.Core/Client/ConnectionType.cs create mode 100644 MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj create mode 100644 MQTTnet.TestApp.NetCore/Program.cs diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs new file mode 100644 index 0000000..da429c7 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs @@ -0,0 +1,119 @@ +using MQTTnet.Core.Channel; +using MQTTnet.Core.Client; +using MQTTnet.Core.Exceptions; +using System; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Implementations +{ + public sealed class MqttWebSocketsChannel : IMqttCommunicationChannel, IDisposable + { + private ClientWebSocket _webSocket = null; + private const int BufferSize = 4096; + private const int BufferAmplifier = 20; + private byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier]; + private int WebSocketBufferSize = 0; + private int WebSocketBufferOffset = 0; + + public MqttWebSocketsChannel() + { + _webSocket = new ClientWebSocket(); + } + + public async Task ConnectAsync(MqttClientOptions options) + { + _webSocket = null; + + try + { + _webSocket = new ClientWebSocket(); + + await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + } + catch (WebSocketException exception) + { + throw new MqttCommunicationException(exception); + } + finally + { + } + } + + public async Task DisconnectAsync() + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + } + + public void Dispose() + { + if (_webSocket != null) + _webSocket.Dispose(); + } + + public Task ReadAsync(byte[] buffer) + { + return Task.WhenAll(ReadToBufferAsync(buffer)); + } + + private async Task ReadToBufferAsync(byte[] buffer) + { + var temporaryBuffer = new byte[BufferSize]; + var offset = 0; + + while (_webSocket.State == WebSocketState.Open) + { + if (WebSocketBufferSize == 0) + { + WebSocketBufferOffset = 0; + + WebSocketReceiveResult response; + do + { + response = + await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None); + + temporaryBuffer.CopyTo(WebSocketBuffer, offset); + offset += response.Count; + temporaryBuffer = new byte[BufferSize]; + } while (!response.EndOfMessage); + + WebSocketBufferSize = response.Count; + if (response.MessageType == WebSocketMessageType.Close) + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + } + + Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length); + WebSocketBufferSize -= buffer.Length; + WebSocketBufferOffset += buffer.Length; + } + else + { + Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length); + WebSocketBufferSize -= buffer.Length; + WebSocketBufferOffset += buffer.Length; + } + + return; + } + } + + public Task WriteAsync(byte[] buffer) + { + if (buffer == null) throw new ArgumentNullException(nameof(buffer)); + + var writeBuffer = System.Text.Encoding.ASCII.GetString(buffer); + try + { + return _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, + CancellationToken.None); + } + catch (WebSocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj index 4043fd1..de3a289 100644 --- a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj +++ b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj @@ -24,6 +24,8 @@ + + diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index f47c494..4d25d10 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -3,6 +3,7 @@ using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; using MQTTnet.Core.Serializer; using MQTTnet.Implementations; +using MQTTnet.Core.Channel; namespace MQTTnet { @@ -12,7 +13,23 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer())); + } + + private IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options) + { + switch (options.ConnectionType) + { + case ConnectionTypes.TCP: + case ConnectionTypes.TLS: + return new MqttTcpChannel(); + case ConnectionTypes.WS: + case ConnectionTypes.WSS: + return new MqttWebSocketsChannel(); + + default: + return null; + } } } } diff --git a/MQTTnet.Core/Client/ConnectionType.cs b/MQTTnet.Core/Client/ConnectionType.cs new file mode 100644 index 0000000..1c6abad --- /dev/null +++ b/MQTTnet.Core/Client/ConnectionType.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace MQTTnet.Core.Client +{ + public enum ConnectionTypes + { + TCP, + TLS, + WS, + WSS + } +} diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs index f9b75fa..8b65299 100644 --- a/MQTTnet.Core/Client/MqttClientOptions.cs +++ b/MQTTnet.Core/Client/MqttClientOptions.cs @@ -24,5 +24,7 @@ namespace MQTTnet.Core.Client public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; + + public ConnectionTypes ConnectionType { get; set; } = ConnectionTypes.TCP; } } diff --git a/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj b/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj new file mode 100644 index 0000000..7b69707 --- /dev/null +++ b/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj @@ -0,0 +1,13 @@ + + + + Exe + netcoreapp2.0 + + + + + + + + diff --git a/MQTTnet.TestApp.NetCore/Program.cs b/MQTTnet.TestApp.NetCore/Program.cs new file mode 100644 index 0000000..df9f9b4 --- /dev/null +++ b/MQTTnet.TestApp.NetCore/Program.cs @@ -0,0 +1,171 @@ +using MQTTnet.Core; +using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; +using MQTTnet.Core.Server; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.TestApp.NetCore +{ + class Program + { + public static void Main(string[] args) + { + Console.WriteLine("MQTTnet - TestApp.NetFramework"); + Console.WriteLine("1 = Start client"); + Console.WriteLine("2 = Start server"); + var pressedKey = Console.ReadKey(true); + if (pressedKey.Key == ConsoleKey.D1) + { + Task.Run(() => RunClientAsync(args)); + Thread.Sleep(Timeout.Infinite); + } + else if (pressedKey.Key == ConsoleKey.D2) + { + Task.Run(() => RunServerAsync(args)); + Thread.Sleep(Timeout.Infinite); + } + } + + private static async Task RunClientAsync(string[] arguments) + { + + MqttTrace.TraceMessagePublished += (s, e) => + { + Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); + if (e.Exception != null) + { + Console.WriteLine(e.Exception); + } + }; + + try + { + var options = new MqttClientOptions + { + Server = "localhost", + ClientId = "XYZ", + CleanSession = true, + ConnectionType = ConnectionTypes.WS + }; + + var client = new MqttClientFactory().CreateMqttClient(options); + client.ApplicationMessageReceived += (s, e) => + { + Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); + Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); + Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); + Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); + Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); + Console.WriteLine(); + }; + + client.Connected += async (s, e) => + { + Console.WriteLine("### CONNECTED WITH SERVER ###"); + + await client.SubscribeAsync(new List + { + new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce) + }); + + Console.WriteLine("### SUBSCRIBED ###"); + }; + + client.Disconnected += async (s, e) => + { + Console.WriteLine("### DISCONNECTED FROM SERVER ###"); + await Task.Delay(TimeSpan.FromSeconds(5)); + + try + { + await client.ConnectAsync(); + } + catch + { + Console.WriteLine("### RECONNECTING FAILED ###"); + } + }; + + try + { + await client.ConnectAsync(); + } + catch (Exception exception) + { + Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception); + } + + Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###"); + + while (true) + { + Console.ReadLine(); + + var applicationMessage = new MqttApplicationMessage( + "A/B/C", + Encoding.UTF8.GetBytes("Hello World"), + MqttQualityOfServiceLevel.AtLeastOnce, + false + ); + + await client.PublishAsync(applicationMessage); + } + } + catch (Exception exception) + { + Console.WriteLine(exception); + } + } + + private static void RunServerAsync(string[] arguments) + { + MqttTrace.TraceMessagePublished += (s, e) => + { + Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); + if (e.Exception != null) + { + Console.WriteLine(e.Exception); + } + }; + + try + { + var options = new MqttServerOptions + { + ConnectionValidator = p => + { + if (p.ClientId == "SpecialClient") + { + if (p.Username != "USER" || p.Password != "PASS") + { + return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; + } + } + + return MqttConnectReturnCode.ConnectionAccepted; + } + }; + + var mqttServer = new MqttServerFactory().CreateMqttServer(options); + mqttServer.Start(); + + Console.WriteLine("Press any key to exit."); + Console.ReadLine(); + + mqttServer.Stop(); + } + catch (Exception e) + { + Console.WriteLine(e); + } + + Console.ReadLine(); + } + } +} diff --git a/MQTTnet.sln b/MQTTnet.sln index ef9ec03..4ad2ba6 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26730.8 +VisualStudioVersion = 15.0.26730.12 MinimumVisualStudioVersion = 10.0.40219.1 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" EndProject @@ -32,6 +32,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution README.md = README.md EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp.NetCore", "MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -162,6 +164,22 @@ Global {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.Build.0 = Release|Any CPU {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.ActiveCfg = Release|Any CPU {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.Build.0 = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|ARM.ActiveCfg = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|ARM.Build.0 = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x64.ActiveCfg = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x64.Build.0 = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x86.ActiveCfg = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x86.Build.0 = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|Any CPU.Build.0 = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|ARM.ActiveCfg = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|ARM.Build.0 = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x64.ActiveCfg = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x64.Build.0 = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x86.ActiveCfg = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -173,6 +191,7 @@ Global {D9D74F33-6943-49B2-B765-7BD589082098} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {FF1F72D6-9524-4422-9497-3CC0002216ED} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} diff --git a/Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj b/Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj index c865621..2bf571e 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj +++ b/Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj @@ -90,7 +90,7 @@ - {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D} + {a480ef90-0eaa-4d9a-b271-47a9c47f6f7d} MQTTnet.NetFramework diff --git a/Tests/MQTTnet.TestApp.NetFramework/Program.cs b/Tests/MQTTnet.TestApp.NetFramework/Program.cs index 824cc78..427274f 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/Program.cs +++ b/Tests/MQTTnet.TestApp.NetFramework/Program.cs @@ -34,6 +34,7 @@ namespace MQTTnet.TestApp.NetFramework private static async Task RunClientAsync(string[] arguments) { + MqttTrace.TraceMessagePublished += (s, e) => { Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); From 4a57d050f62406882a2a1df9e72ca91edb540fab Mon Sep 17 00:00:00 2001 From: "IMPAQ\\pnow" Date: Fri, 1 Sep 2017 11:28:31 +0200 Subject: [PATCH 2/2] Clean Up --- .../Implementations/MqttWebSocketsChannel.cs | 18 +++++++++--------- .../MQTTnet.NetStandard/MqttClientFactory.cs | 6 ++++-- MQTTnet.TestApp.NetCore/Program.cs | 2 +- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs index da429c7..8d51812 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs @@ -10,12 +10,12 @@ namespace MQTTnet.Implementations { public sealed class MqttWebSocketsChannel : IMqttCommunicationChannel, IDisposable { - private ClientWebSocket _webSocket = null; + private ClientWebSocket _webSocket; private const int BufferSize = 4096; private const int BufferAmplifier = 20; - private byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier]; - private int WebSocketBufferSize = 0; - private int WebSocketBufferOffset = 0; + private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier]; + private int WebSocketBufferSize; + private int WebSocketBufferOffset; public MqttWebSocketsChannel() { @@ -36,9 +36,6 @@ namespace MQTTnet.Implementations { throw new MqttCommunicationException(exception); } - finally - { - } } public async Task DisconnectAsync() @@ -49,7 +46,9 @@ namespace MQTTnet.Implementations public void Dispose() { if (_webSocket != null) + { _webSocket.Dispose(); + } } public Task ReadAsync(byte[] buffer) @@ -102,9 +101,10 @@ namespace MQTTnet.Implementations public Task WriteAsync(byte[] buffer) { - if (buffer == null) throw new ArgumentNullException(nameof(buffer)); + if (buffer == null) { + throw new ArgumentNullException(nameof(buffer)); + } - var writeBuffer = System.Text.Encoding.ASCII.GetString(buffer); try { return _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index 4d25d10..1cd6d6f 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -11,12 +11,14 @@ namespace MQTTnet { public IMqttClient CreateMqttClient(MqttClientOptions options) { - if (options == null) throw new ArgumentNullException(nameof(options)); + if (options == null) { + throw new ArgumentNullException(nameof(options)); + } return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer())); } - private IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options) + private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options) { switch (options.ConnectionType) { diff --git a/MQTTnet.TestApp.NetCore/Program.cs b/MQTTnet.TestApp.NetCore/Program.cs index df9f9b4..4644471 100644 --- a/MQTTnet.TestApp.NetCore/Program.cs +++ b/MQTTnet.TestApp.NetCore/Program.cs @@ -12,7 +12,7 @@ using System.Threading.Tasks; namespace MQTTnet.TestApp.NetCore { - class Program + public static class Program { public static void Main(string[] args) {