diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs new file mode 100644 index 0000000..da429c7 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs @@ -0,0 +1,119 @@ +using MQTTnet.Core.Channel; +using MQTTnet.Core.Client; +using MQTTnet.Core.Exceptions; +using System; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Implementations +{ + public sealed class MqttWebSocketsChannel : IMqttCommunicationChannel, IDisposable + { + private ClientWebSocket _webSocket = null; + private const int BufferSize = 4096; + private const int BufferAmplifier = 20; + private byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier]; + private int WebSocketBufferSize = 0; + private int WebSocketBufferOffset = 0; + + public MqttWebSocketsChannel() + { + _webSocket = new ClientWebSocket(); + } + + public async Task ConnectAsync(MqttClientOptions options) + { + _webSocket = null; + + try + { + _webSocket = new ClientWebSocket(); + + await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + } + catch (WebSocketException exception) + { + throw new MqttCommunicationException(exception); + } + finally + { + } + } + + public async Task DisconnectAsync() + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + } + + public void Dispose() + { + if (_webSocket != null) + _webSocket.Dispose(); + } + + public Task ReadAsync(byte[] buffer) + { + return Task.WhenAll(ReadToBufferAsync(buffer)); + } + + private async Task ReadToBufferAsync(byte[] buffer) + { + var temporaryBuffer = new byte[BufferSize]; + var offset = 0; + + while (_webSocket.State == WebSocketState.Open) + { + if (WebSocketBufferSize == 0) + { + WebSocketBufferOffset = 0; + + WebSocketReceiveResult response; + do + { + response = + await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None); + + temporaryBuffer.CopyTo(WebSocketBuffer, offset); + offset += response.Count; + temporaryBuffer = new byte[BufferSize]; + } while (!response.EndOfMessage); + + WebSocketBufferSize = response.Count; + if (response.MessageType == WebSocketMessageType.Close) + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + } + + Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length); + WebSocketBufferSize -= buffer.Length; + WebSocketBufferOffset += buffer.Length; + } + else + { + Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length); + WebSocketBufferSize -= buffer.Length; + WebSocketBufferOffset += buffer.Length; + } + + return; + } + } + + public Task WriteAsync(byte[] buffer) + { + if (buffer == null) throw new ArgumentNullException(nameof(buffer)); + + var writeBuffer = System.Text.Encoding.ASCII.GetString(buffer); + try + { + return _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, + CancellationToken.None); + } + catch (WebSocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj index 4043fd1..de3a289 100644 --- a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj +++ b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj @@ -24,6 +24,8 @@ + + diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index f47c494..4d25d10 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -3,6 +3,7 @@ using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; using MQTTnet.Core.Serializer; using MQTTnet.Implementations; +using MQTTnet.Core.Channel; namespace MQTTnet { @@ -12,7 +13,23 @@ namespace MQTTnet { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer())); + } + + private IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options) + { + switch (options.ConnectionType) + { + case ConnectionTypes.TCP: + case ConnectionTypes.TLS: + return new MqttTcpChannel(); + case ConnectionTypes.WS: + case ConnectionTypes.WSS: + return new MqttWebSocketsChannel(); + + default: + return null; + } } } } diff --git a/MQTTnet.Core/Client/ConnectionType.cs b/MQTTnet.Core/Client/ConnectionType.cs new file mode 100644 index 0000000..1c6abad --- /dev/null +++ b/MQTTnet.Core/Client/ConnectionType.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace MQTTnet.Core.Client +{ + public enum ConnectionTypes + { + TCP, + TLS, + WS, + WSS + } +} diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs index f9b75fa..8b65299 100644 --- a/MQTTnet.Core/Client/MqttClientOptions.cs +++ b/MQTTnet.Core/Client/MqttClientOptions.cs @@ -24,5 +24,7 @@ namespace MQTTnet.Core.Client public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; + + public ConnectionTypes ConnectionType { get; set; } = ConnectionTypes.TCP; } } diff --git a/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj b/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj new file mode 100644 index 0000000..7b69707 --- /dev/null +++ b/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj @@ -0,0 +1,13 @@ + + + + Exe + netcoreapp2.0 + + + + + + + + diff --git a/MQTTnet.TestApp.NetCore/Program.cs b/MQTTnet.TestApp.NetCore/Program.cs new file mode 100644 index 0000000..df9f9b4 --- /dev/null +++ b/MQTTnet.TestApp.NetCore/Program.cs @@ -0,0 +1,171 @@ +using MQTTnet.Core; +using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; +using MQTTnet.Core.Server; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.TestApp.NetCore +{ + class Program + { + public static void Main(string[] args) + { + Console.WriteLine("MQTTnet - TestApp.NetFramework"); + Console.WriteLine("1 = Start client"); + Console.WriteLine("2 = Start server"); + var pressedKey = Console.ReadKey(true); + if (pressedKey.Key == ConsoleKey.D1) + { + Task.Run(() => RunClientAsync(args)); + Thread.Sleep(Timeout.Infinite); + } + else if (pressedKey.Key == ConsoleKey.D2) + { + Task.Run(() => RunServerAsync(args)); + Thread.Sleep(Timeout.Infinite); + } + } + + private static async Task RunClientAsync(string[] arguments) + { + + MqttTrace.TraceMessagePublished += (s, e) => + { + Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); + if (e.Exception != null) + { + Console.WriteLine(e.Exception); + } + }; + + try + { + var options = new MqttClientOptions + { + Server = "localhost", + ClientId = "XYZ", + CleanSession = true, + ConnectionType = ConnectionTypes.WS + }; + + var client = new MqttClientFactory().CreateMqttClient(options); + client.ApplicationMessageReceived += (s, e) => + { + Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); + Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); + Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); + Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); + Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); + Console.WriteLine(); + }; + + client.Connected += async (s, e) => + { + Console.WriteLine("### CONNECTED WITH SERVER ###"); + + await client.SubscribeAsync(new List + { + new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce) + }); + + Console.WriteLine("### SUBSCRIBED ###"); + }; + + client.Disconnected += async (s, e) => + { + Console.WriteLine("### DISCONNECTED FROM SERVER ###"); + await Task.Delay(TimeSpan.FromSeconds(5)); + + try + { + await client.ConnectAsync(); + } + catch + { + Console.WriteLine("### RECONNECTING FAILED ###"); + } + }; + + try + { + await client.ConnectAsync(); + } + catch (Exception exception) + { + Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception); + } + + Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###"); + + while (true) + { + Console.ReadLine(); + + var applicationMessage = new MqttApplicationMessage( + "A/B/C", + Encoding.UTF8.GetBytes("Hello World"), + MqttQualityOfServiceLevel.AtLeastOnce, + false + ); + + await client.PublishAsync(applicationMessage); + } + } + catch (Exception exception) + { + Console.WriteLine(exception); + } + } + + private static void RunServerAsync(string[] arguments) + { + MqttTrace.TraceMessagePublished += (s, e) => + { + Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); + if (e.Exception != null) + { + Console.WriteLine(e.Exception); + } + }; + + try + { + var options = new MqttServerOptions + { + ConnectionValidator = p => + { + if (p.ClientId == "SpecialClient") + { + if (p.Username != "USER" || p.Password != "PASS") + { + return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; + } + } + + return MqttConnectReturnCode.ConnectionAccepted; + } + }; + + var mqttServer = new MqttServerFactory().CreateMqttServer(options); + mqttServer.Start(); + + Console.WriteLine("Press any key to exit."); + Console.ReadLine(); + + mqttServer.Stop(); + } + catch (Exception e) + { + Console.WriteLine(e); + } + + Console.ReadLine(); + } + } +} diff --git a/MQTTnet.sln b/MQTTnet.sln index ef9ec03..4ad2ba6 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26730.8 +VisualStudioVersion = 15.0.26730.12 MinimumVisualStudioVersion = 10.0.40219.1 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" EndProject @@ -32,6 +32,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution README.md = README.md EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp.NetCore", "MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -162,6 +164,22 @@ Global {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.Build.0 = Release|Any CPU {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.ActiveCfg = Release|Any CPU {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.Build.0 = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|ARM.ActiveCfg = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|ARM.Build.0 = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x64.ActiveCfg = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x64.Build.0 = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x86.ActiveCfg = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x86.Build.0 = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|Any CPU.Build.0 = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|ARM.ActiveCfg = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|ARM.Build.0 = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x64.ActiveCfg = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x64.Build.0 = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x86.ActiveCfg = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -173,6 +191,7 @@ Global {D9D74F33-6943-49B2-B765-7BD589082098} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {FF1F72D6-9524-4422-9497-3CC0002216ED} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} diff --git a/Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj b/Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj index c865621..2bf571e 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj +++ b/Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj @@ -90,7 +90,7 @@ - {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D} + {a480ef90-0eaa-4d9a-b271-47a9c47f6f7d} MQTTnet.NetFramework diff --git a/Tests/MQTTnet.TestApp.NetFramework/Program.cs b/Tests/MQTTnet.TestApp.NetFramework/Program.cs index 824cc78..427274f 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/Program.cs +++ b/Tests/MQTTnet.TestApp.NetFramework/Program.cs @@ -34,6 +34,7 @@ namespace MQTTnet.TestApp.NetFramework private static async Task RunClientAsync(string[] arguments) { + MqttTrace.TraceMessagePublished += (s, e) => { Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}");