From 9fa5e2dc1f794d0a215db8cc642f84711cce28b4 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 4 Jul 2018 21:57:51 +0200 Subject: [PATCH 01/19] Add the adapter Endpoint to the connection validator. --- Source/MQTTnet/Server/MqttClientSessionsManager.cs | 7 ++++--- Source/MQTTnet/Server/MqttConnectionValidatorContext.cs | 5 ++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index a41be4f..34080a0 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -212,7 +212,7 @@ namespace MQTTnet.Server // Switch to the required protocol version before sending any response. clientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion; - var connectReturnCode = ValidateConnection(connectPacket); + var connectReturnCode = ValidateConnection(connectPacket, clientAdapter); if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { await clientAdapter.SendPacketAsync( @@ -268,7 +268,7 @@ namespace MQTTnet.Server } } - private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket) + private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket, IMqttChannelAdapter clientAdapter) { if (_options.ConnectionValidator == null) { @@ -279,7 +279,8 @@ namespace MQTTnet.Server connectPacket.ClientId, connectPacket.Username, connectPacket.Password, - connectPacket.WillMessage); + connectPacket.WillMessage, + clientAdapter.Endpoint); _options.ConnectionValidator(context); return context.ReturnCode; diff --git a/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs b/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs index a24d795..5857862 100644 --- a/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs +++ b/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs @@ -4,12 +4,13 @@ namespace MQTTnet.Server { public class MqttConnectionValidatorContext { - public MqttConnectionValidatorContext(string clientId, string username, string password, MqttApplicationMessage willMessage) + public MqttConnectionValidatorContext(string clientId, string username, string password, MqttApplicationMessage willMessage, string endpoint) { ClientId = clientId; Username = username; Password = password; WillMessage = willMessage; + Endpoint = endpoint; } public string ClientId { get; } @@ -20,6 +21,8 @@ namespace MQTTnet.Server public MqttApplicationMessage WillMessage { get; } + public string Endpoint { get; } + public MqttConnectReturnCode ReturnCode { get; set; } = MqttConnectReturnCode.ConnectionAccepted; } } From 7fd09009e7ea88914c1fdbb77ff59ef4205d5848 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 4 Jul 2018 21:58:00 +0200 Subject: [PATCH 02/19] Update docs. --- Build/MQTTnet.nuspec | 28 +-------------------- Source/MQTTnet.Extensions.Rpc/SampleCCode.c | 2 +- 2 files changed, 2 insertions(+), 28 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 5cdf3d5..3d3375c 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -10,33 +10,7 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - ** MQTTnet is now available at Open Collective for donations (https://opencollective.com/mqttnet). ** -* [Core] Performance optimizations. -* [Core] Due to performance reasons the timestamp of log messages is now in UTC format. -* [Core] Added several packet validations. -* [Core] Log messages now contain the complete source path including parent components. -* [Core] The adapter now has an _Endpoint_ definition as string containing remote IP and port. -* [Client] Received messages are now processed completely in the worker thread without creating new Tasks. -* [Client] Fixed wrong calculation for sending keep alive packets (thanks to @cstichlberger) -* [Client] A clean disconnect (via DisconnectAsync) will no longer throw an exception. -* [Client] Added new overloads for quick message publishing. -* [ManagedClient] The managed client is moved to a separate nuget package. -* [ManagedClient] Added an own message format with extended properties like ID (BREAKING CHANGE). -* [ManagedClient] Fixed a loading issue of stored application messages (thanks to @JTrotta). -* [ManagedClient] Added a new event which is fired when a synchronization of the subscriptions has failed. -* [ManagedClient] Added a new event which is fired when a connection attempt has failed. -* [ManagedClient] Exposed a new property which provides the count of not published messages (pending messages count). -* [Server] Added support for other WebSocket sub protocol formats like mqttv-3.1.1 (thanks to @israellot). -* [Server] The takeover of an existing client sessions is now treated as a _clean_ disconnect of the previous client. -* [Server] The pending messages queue per client is now limited to 250 messages. Overflow strategy and count can be changed via options (thanks to @VladimirAkopyan) -* [Server] Keep alive checking is now suspended while large packages are being received (and thus the client is connected). Keep alive checking continues after a large packet is received completely. -* [Server] Rewritten the _ConnectedClients_ API and added new features for disconnecting and Endpoint information (IP etc.). -* [Server] Added settings for disabling persistent sessions and defining a max pending messages queue size per session. -* [Server] Persistent sessions are disabled by default (BREAKING CHANGE!). -* [Server] Added a new interceptor which is invoked before a new message is added to the client queue. -* [Server] Added support for Linux servers by dividing IPv4 and IPv6 support and adding new options (BREAKING CHANGE!). -* [Server] Gracefully closed connections are no longer reported as warnings. -* [Server] Added new overloads for initializing the ASP.NET Core integration. + Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Source/MQTTnet.Extensions.Rpc/SampleCCode.c b/Source/MQTTnet.Extensions.Rpc/SampleCCode.c index c20aa4a..abcd655 100644 --- a/Source/MQTTnet.Extensions.Rpc/SampleCCode.c +++ b/Source/MQTTnet.Extensions.Rpc/SampleCCode.c @@ -3,7 +3,7 @@ _mqttClient.subscribe("MQTTnet.RPC/+/ping"); _mqttClient.subscribe("MQTTnet.RPC/+/do_something"); // It is not allowed to change the structure of the topic. Otherwise RPC will not work. So method names can be separated using -// an _ or . but no +, # or . If it is required to distinguish between devices own rules can be defined like the following. +// an _ or . but no +, # or /. If it is required to distinguish between devices own rules can be defined like the following. _mqttClient.subscribe("MQTTnet.RPC/+/deviceA.ping"); _mqttClient.subscribe("MQTTnet.RPC/+/deviceB.ping"); _mqttClient.subscribe("MQTTnet.RPC/+/deviceC.getTemperature"); From 7d9326a1471e301774d859c9c94e14ae9e1472b6 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 4 Jul 2018 22:00:34 +0200 Subject: [PATCH 03/19] Extend MqttPacketReader constructor to allow more unit tests. --- Source/MQTTnet.AspnetCore/ReaderExtensions.cs | 3 +- Source/MQTTnet/Adapter/MqttChannelAdapter.cs | 2 +- Source/MQTTnet/Internal/TestMqttChannel.cs | 10 ++-- .../Serializer/MqttPacketBodyReader.cs | 17 +++--- Source/MQTTnet/Serializer/MqttPacketReader.cs | 3 +- .../MQTTnet.Benchmarks/SerializerBenchmark.cs | 2 +- .../MqttPacketSerializerTests.cs | 56 +++++++++++++++++-- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 5 +- 8 files changed, 76 insertions(+), 22 deletions(-) diff --git a/Source/MQTTnet.AspnetCore/ReaderExtensions.cs b/Source/MQTTnet.AspnetCore/ReaderExtensions.cs index b893e43..6c770b9 100644 --- a/Source/MQTTnet.AspnetCore/ReaderExtensions.cs +++ b/Source/MQTTnet.AspnetCore/ReaderExtensions.cs @@ -32,7 +32,8 @@ namespace MQTTnet.AspNetCore } var bodySlice = copy.Slice(0, bodyLength); - packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, new MqttPacketBodyReader(bodySlice.GetArray(), 0))); + var buffer = bodySlice.GetArray(); + packet = serializer.Deserialize(new ReceivedMqttPacket(fixedheader, new MqttPacketBodyReader(buffer, 0, buffer.Length))); consumed = bodySlice.End; observed = bodySlice.End; return true; diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index 0059f20..7396515 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -201,7 +201,7 @@ namespace MQTTnet.Adapter bodyOffset += readBytes; } while (bodyOffset < body.Length); - return new ReceivedMqttPacket(fixedHeader.Flags, new MqttPacketBodyReader(body, 0)); + return new ReceivedMqttPacket(fixedHeader.Flags, new MqttPacketBodyReader(body, 0, body.Length)); } finally { diff --git a/Source/MQTTnet/Internal/TestMqttChannel.cs b/Source/MQTTnet/Internal/TestMqttChannel.cs index e48b8ed..08920e3 100644 --- a/Source/MQTTnet/Internal/TestMqttChannel.cs +++ b/Source/MQTTnet/Internal/TestMqttChannel.cs @@ -14,11 +14,7 @@ namespace MQTTnet.Internal _stream = stream; } - public void Dispose() - { - } - - public string Endpoint { get; } + public string Endpoint { get; } = ""; public Task ConnectAsync(CancellationToken cancellationToken) { @@ -39,5 +35,9 @@ namespace MQTTnet.Internal { return _stream.WriteAsync(buffer, offset, count, cancellationToken); } + + public void Dispose() + { + } } } diff --git a/Source/MQTTnet/Serializer/MqttPacketBodyReader.cs b/Source/MQTTnet/Serializer/MqttPacketBodyReader.cs index 2cf37dd..f0e1b56 100644 --- a/Source/MQTTnet/Serializer/MqttPacketBodyReader.cs +++ b/Source/MQTTnet/Serializer/MqttPacketBodyReader.cs @@ -6,17 +6,20 @@ namespace MQTTnet.Serializer public class MqttPacketBodyReader { private readonly byte[] _buffer; - private int _offset; + private readonly int _length; - public MqttPacketBodyReader(byte[] buffer, int offset) + private int _offset; + + public MqttPacketBodyReader(byte[] buffer, int offset, int length) { _buffer = buffer; _offset = offset; + _length = length; } - public int Length => _buffer.Length - _offset; + public int Length => _length - _offset; - public bool EndOfStream => _offset == _buffer.Length; + public bool EndOfStream => _offset == _length; public byte ReadByte() { @@ -26,7 +29,7 @@ namespace MQTTnet.Serializer public ArraySegment ReadRemainingData() { - return new ArraySegment(_buffer, _offset, _buffer.Length - _offset); + return new ArraySegment(_buffer, _offset, _length - _offset); } public ushort ReadUInt16() @@ -53,9 +56,9 @@ namespace MQTTnet.Serializer private void ValidateReceiveBuffer(ushort length) { - if (_buffer.Length < _offset + length) + if (_length < _offset + length) { - throw new ArgumentOutOfRangeException(nameof(_buffer), $"expected at least {_offset + length} bytes but there are only {_buffer.Length} bytes"); + throw new ArgumentOutOfRangeException(nameof(_buffer), $"expected at least {_offset + length} bytes but there are only {_length} bytes"); } } diff --git a/Source/MQTTnet/Serializer/MqttPacketReader.cs b/Source/MQTTnet/Serializer/MqttPacketReader.cs index 32a9285..ff8b54b 100644 --- a/Source/MQTTnet/Serializer/MqttPacketReader.cs +++ b/Source/MQTTnet/Serializer/MqttPacketReader.cs @@ -1,5 +1,4 @@ -using System; -using System.Threading; +using System.Threading; using System.Threading.Tasks; using MQTTnet.Channel; using MQTTnet.Exceptions; diff --git a/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs b/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs index 8beb7b4..9b85580 100644 --- a/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs @@ -58,7 +58,7 @@ namespace MQTTnet.Benchmarks var receivedPacket = new ReceivedMqttPacket( header.Flags, - new MqttPacketBodyReader(_serializedPacket.Array, _serializedPacket.Count - header.RemainingLength)); + new MqttPacketBodyReader(_serializedPacket.Array, _serializedPacket.Count - header.RemainingLength, _serializedPacket.Array.Length)); _serializer.Deserialize(receivedPacket); } diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index d8d23e2..c3a75f8 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -1,5 +1,6 @@ using System; using System.IO; +using System.Linq; using System.Text; using System.Threading; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -149,6 +150,53 @@ namespace MQTTnet.Core.Tests DeserializeAndCompare(p, "IAIABQ==", MqttProtocolVersion.V310); } + [TestMethod] + public void Serialize_LargePacket() + { + var serializer = new MqttPacketSerializer { ProtocolVersion = MqttProtocolVersion.V311 }; + + const int payloadLength = 80000; + + var payload = new byte[payloadLength]; + + var value = 0; + for (var i = 0; i < payloadLength; i++) + { + if (value > 255) + { + value = 0; + } + + payload[i] = (byte)value; + } + + var publishPacket = new MqttPublishPacket + { + Topic = "abcdefghijklmnopqrstuvwxyz0123456789", + Payload = payload + }; + + var buffer = serializer.Serialize(publishPacket); + var testChannel = new TestMqttChannel(new MemoryStream(buffer.Array, buffer.Offset, buffer.Count)); + + var header = MqttPacketReader.ReadFixedHeaderAsync( + testChannel, + new byte[2], + new byte[1], + CancellationToken.None).GetAwaiter().GetResult(); + + var eof = buffer.Offset + buffer.Count; + + var receivedPacket = new ReceivedMqttPacket( + header.Flags, + new MqttPacketBodyReader(buffer.Array, eof - header.RemainingLength, buffer.Count + buffer.Offset)); + + var packet = (MqttPublishPacket)serializer.Deserialize(receivedPacket); + + Assert.AreEqual(publishPacket.Topic, packet.Topic); + Assert.IsTrue(publishPacket.Payload.SequenceEqual(packet.Payload)); + } + [TestMethod] public void SerializeV311_MqttDisconnectPacket() { @@ -463,7 +511,7 @@ namespace MQTTnet.Core.Tests { var serializer = new MqttPacketSerializer { ProtocolVersion = protocolVersion }; var data = serializer.Serialize(packet); - + Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(Join(data))); } @@ -479,10 +527,10 @@ namespace MQTTnet.Core.Tests var fixedHeader = new byte[2]; var singleByteBuffer = new byte[1]; var header = MqttPacketReader.ReadFixedHeaderAsync(channel, fixedHeader, singleByteBuffer, CancellationToken.None).GetAwaiter().GetResult(); - + using (var bodyStream = new MemoryStream(Join(buffer1), (int)headerStream.Position, header.RemainingLength)) { - var deserializedPacket = serializer.Deserialize(new ReceivedMqttPacket(header.Flags, new MqttPacketBodyReader(bodyStream.ToArray(), 0))); + var deserializedPacket = serializer.Deserialize(new ReceivedMqttPacket(header.Flags, new MqttPacketBodyReader(bodyStream.ToArray(), 0, (int)bodyStream.Length))); var buffer2 = serializer.Serialize(deserializedPacket); Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(Join(buffer2))); @@ -507,7 +555,7 @@ namespace MQTTnet.Core.Tests using (var bodyStream = new MemoryStream(Join(buffer1), (int)headerStream.Position, header.RemainingLength)) { - return (T)serializer.Deserialize(new ReceivedMqttPacket(header.Flags, new MqttPacketBodyReader(bodyStream.ToArray(), 0))); + return (T)serializer.Deserialize(new ReceivedMqttPacket(header.Flags, new MqttPacketBodyReader(bodyStream.ToArray(), 0, (int)bodyStream.Length))); } } } diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index ea258f5..95bb67d 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -375,6 +375,9 @@ namespace MQTTnet.Core.Tests var c1 = await serverAdapter.ConnectTestClient("c1"); await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); + + await Task.Delay(250); + await c1.DisconnectAsync(); } finally @@ -396,7 +399,7 @@ namespace MQTTnet.Core.Tests c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build()); - await Task.Delay(500); + await Task.Delay(250); } finally { From bf1d682a854ca2cf5e880c10a92b658410deb8fc Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 4 Jul 2018 22:00:48 +0200 Subject: [PATCH 04/19] Update libs --- Source/MQTTnet/MQTTnet.csproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet/MQTTnet.csproj b/Source/MQTTnet/MQTTnet.csproj index 26bf002..984b563 100644 --- a/Source/MQTTnet/MQTTnet.csproj +++ b/Source/MQTTnet/MQTTnet.csproj @@ -20,7 +20,7 @@ UAP,Version=v10.0 UAP 10.0.17134.0 - 10.0.10240.0 + 10.0.10586.212 .NETCore v5.0 $(DefineConstants);WINDOWS_UWP @@ -53,7 +53,7 @@ - + From bfb6ec885692111bcb3d8ff78547ff6ee3089410 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 4 Jul 2018 22:02:42 +0200 Subject: [PATCH 05/19] Update UWP test app. --- .../MQTTnet.TestApp.UniversalWindows.csproj | 2 +- .../MainPage.xaml | 127 ++++++++++++------ .../MainPage.xaml.cs | 100 ++++++++++---- 3 files changed, 163 insertions(+), 66 deletions(-) diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj index d709083..a6f4566 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj @@ -142,7 +142,7 @@ - 6.1.4 + 6.1.5 3.0.0 diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml index 41617df..c99ebd8 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml @@ -5,7 +5,6 @@ xmlns:d="http://schemas.microsoft.com/expression/blend/2008" xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" xmlns:server="using:MQTTnet.Server" - xmlns:interop="using:Windows.UI.Xaml.Interop" d:DesignHeight="800" d:DesignWidth="800" mc:Ignorable="d"> @@ -19,26 +18,55 @@ - Server: - - Port: - - User: - - Password: - - ClientId: - - Clean session: - - Keep alive interval: - - - - TCP - WS - Use TLS - + + + + + + + + + + + + + + + + + + + Server: + + + Port: + + + Username: + + + Password: + + + Client ID: + + + Keep Alive interval: + + + Protocol: + + TCP + WS + Use TLS + + + + Clean session + Use managed client @@ -49,25 +77,46 @@ - Topic: - + + + + + + + + + + + + + + + + Topic: + + + Payload: + + + Payload format: + + Plain text + Base64 string + + + QoS level: + + 0 (At most once) + 1 (At least once) + 2 (Exactly once) + + + + Retain - Payload: - - - Text - Base64 - - - Retain: - - - QoS: - - 0 (At most once) - 1 (At least once) - 2 (Exactly once) - diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 387364a..b557fe0 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.ObjectModel; +using System.IO; using System.Text; using System.Threading.Tasks; using Windows.Security.Cryptography.Certificates; @@ -25,12 +26,15 @@ namespace MQTTnet.TestApp.UniversalWindows private readonly ObservableCollection _sessions = new ObservableCollection(); private IMqttClient _mqttClient; + private IManagedMqttClient _managedMqttClient; private IMqttServer _mqttServer; public MainPage() { InitializeComponent(); + ClientId.Text = Guid.NewGuid().ToString("D"); + MqttNetGlobalLogger.LogMessagePublished += OnTraceMessagePublished; } @@ -81,7 +85,10 @@ namespace MQTTnet.TestApp.UniversalWindows AllowUntrustedCertificates = true }; - var options = new MqttClientOptions { ClientId = ClientId.Text }; + var options = new MqttClientOptions + { + ClientId = ClientId.Text + }; if (UseTcp.IsChecked == true) { @@ -127,12 +134,28 @@ namespace MQTTnet.TestApp.UniversalWindows } var factory = new MqttFactory(); - _mqttClient = factory.CreateMqttClient(); - _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; - _mqttClient.Connected += OnConnected; - _mqttClient.Disconnected += OnDisconnected; - await _mqttClient.ConnectAsync(options); + if (UseManagedClient.IsChecked == true) + { + _managedMqttClient = factory.CreateManagedMqttClient(); + _managedMqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; + _managedMqttClient.Connected += OnConnected; + _managedMqttClient.Disconnected += OnDisconnected; + + await _managedMqttClient.StartAsync(new ManagedMqttClientOptions + { + ClientOptions = options + }); + } + else + { + _mqttClient = factory.CreateMqttClient(); + _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; + _mqttClient.Connected += OnConnected; + _mqttClient.Disconnected += OnDisconnected; + + await _mqttClient.ConnectAsync(options); + } } catch (Exception exception) { @@ -171,11 +194,6 @@ namespace MQTTnet.TestApp.UniversalWindows private async void Publish(object sender, RoutedEventArgs e) { - if (_mqttClient == null) - { - return; - } - try { var qos = MqttQualityOfServiceLevel.AtMostOnce; @@ -190,7 +208,7 @@ namespace MQTTnet.TestApp.UniversalWindows } var payload = new byte[0]; - if (Text.IsChecked == true) + if (PlainText.IsChecked == true) { payload = Encoding.UTF8.GetBytes(Payload.Text); } @@ -207,7 +225,15 @@ namespace MQTTnet.TestApp.UniversalWindows .WithRetainFlag(Retain.IsChecked == true) .Build(); - await _mqttClient.PublishAsync(message); + if (_mqttClient != null) + { + await _mqttClient.PublishAsync(message); + } + + if (_managedMqttClient != null) + { + await _managedMqttClient.PublishAsync(message); + } } catch (Exception exception) { @@ -219,7 +245,19 @@ namespace MQTTnet.TestApp.UniversalWindows { try { - await _mqttClient.DisconnectAsync(); + if (_mqttClient != null) + { + await _mqttClient.DisconnectAsync(); + _mqttClient.Dispose(); + _mqttClient = null; + } + + if (_managedMqttClient != null) + { + await _managedMqttClient.StopAsync(); + _managedMqttClient.Dispose(); + _managedMqttClient = null; + } } catch (Exception exception) { @@ -239,11 +277,6 @@ namespace MQTTnet.TestApp.UniversalWindows private async void Subscribe(object sender, RoutedEventArgs e) { - if (_mqttClient == null) - { - return; - } - try { var qos = MqttQualityOfServiceLevel.AtMostOnce; @@ -257,7 +290,15 @@ namespace MQTTnet.TestApp.UniversalWindows qos = MqttQualityOfServiceLevel.ExactlyOnce; } - await _mqttClient.SubscribeAsync(new TopicFilter(SubscribeTopic.Text, qos)); + if (_mqttClient != null) + { + await _mqttClient.SubscribeAsync(new TopicFilter(SubscribeTopic.Text, qos)); + } + + if (_managedMqttClient != null) + { + await _managedMqttClient.SubscribeAsync(new TopicFilter(SubscribeTopic.Text, qos)); + } } catch (Exception exception) { @@ -267,14 +308,17 @@ namespace MQTTnet.TestApp.UniversalWindows private async void Unsubscribe(object sender, RoutedEventArgs e) { - if (_mqttClient == null) - { - return; - } - try { - await _mqttClient.UnsubscribeAsync(SubscribeTopic.Text); + if (_mqttClient != null) + { + await _mqttClient.UnsubscribeAsync(SubscribeTopic.Text); + } + + if (_managedMqttClient != null) + { + await _managedMqttClient.UnsubscribeAsync(SubscribeTopic.Text); + } } catch (Exception exception) { @@ -398,6 +442,8 @@ namespace MQTTnet.TestApp.UniversalWindows ListViewSessions.DataContext = _sessions; } + #region Wiki Code + private async Task WikiCode() { { @@ -632,5 +678,7 @@ namespace MQTTnet.TestApp.UniversalWindows await mqttClient.StartAsync(options); } } + + #endregion } } From 3d0ecc05ff876f6669a7e8eed54781624903e3b2 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 4 Jul 2018 22:03:19 +0200 Subject: [PATCH 06/19] Fix bug which prevents receiving large packets for UWP. --- Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs index 2ccc7d9..8ae9192 100644 --- a/Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs +++ b/Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs @@ -154,7 +154,11 @@ namespace MQTTnet.Implementations private void CreateStreams() { - _readStream = _socket.InputStream.AsStreamForRead(_bufferSize); + // Attention! Do not set the buffer for the read method. This will + // limit the internal buffer and the read operation will hang forever + // is more data than the buffer size was received. + _readStream = _socket.InputStream.AsStreamForRead(); + _writeStream = _socket.OutputStream.AsStreamForWrite(_bufferSize); } From 35300abfb5e942ccf30d126ea4e31e417291367b Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 4 Jul 2018 22:03:36 +0200 Subject: [PATCH 07/19] Fix typo. --- Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs index 8ae9192..6da9da2 100644 --- a/Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs +++ b/Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs @@ -156,7 +156,7 @@ namespace MQTTnet.Implementations { // Attention! Do not set the buffer for the read method. This will // limit the internal buffer and the read operation will hang forever - // is more data than the buffer size was received. + // if more data than the buffer size was received. _readStream = _socket.InputStream.AsStreamForRead(); _writeStream = _socket.OutputStream.AsStreamForWrite(_bufferSize); From 2b420b8e66cccaf6301c4e051cf75b3a2cfb514a Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 4 Jul 2018 22:20:02 +0200 Subject: [PATCH 08/19] Add support for disabling server endpoints (ipv4 or ipv6) --- .../Implementations/MqttTcpServerAdapter.cs | 49 +++++++++++-------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs index eeefb34..cfbf725 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs @@ -1,6 +1,7 @@ #if !WINDOWS_UWP using System; using System.Collections.Generic; +using System.Net; using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using System.Threading; @@ -79,27 +80,33 @@ namespace MQTTnet.Implementations private void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate) { - var listenerV4 = new MqttTcpServerListener( - AddressFamily.InterNetwork, - options, - tlsCertificate, - _cancellationTokenSource.Token, - _logger); - - listenerV4.ClientAccepted += OnClientAccepted; - listenerV4.Start(); - _listeners.Add(listenerV4); - - var listenerV6 = new MqttTcpServerListener( - AddressFamily.InterNetworkV6, - options, - tlsCertificate, - _cancellationTokenSource.Token, - _logger); - - listenerV6.ClientAccepted += OnClientAccepted; - listenerV6.Start(); - _listeners.Add(listenerV6); + if (!options.BoundInterNetworkAddress.Equals(IPAddress.None)) + { + var listenerV4 = new MqttTcpServerListener( + AddressFamily.InterNetwork, + options, + tlsCertificate, + _cancellationTokenSource.Token, + _logger); + + listenerV4.ClientAccepted += OnClientAccepted; + listenerV4.Start(); + _listeners.Add(listenerV4); + } + + if (!options.BoundInterNetworkV6Address.Equals(IPAddress.None)) + { + var listenerV6 = new MqttTcpServerListener( + AddressFamily.InterNetworkV6, + options, + tlsCertificate, + _cancellationTokenSource.Token, + _logger); + + listenerV6.ClientAccepted += OnClientAccepted; + listenerV6.Start(); + _listeners.Add(listenerV6); + } } private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs e) From ce534baa750bf5f4a89d38f173c13f65d9253660 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 4 Jul 2018 22:20:24 +0200 Subject: [PATCH 09/19] Move the connection check interval of the ManagedClient to the options. --- .../IManagedMqttClientOptions.cs | 2 ++ Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs | 2 +- .../ManagedMqttClientOptions.cs | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientOptions.cs b/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientOptions.cs index 77736c4..8d48bab 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientOptions.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientOptions.cs @@ -9,6 +9,8 @@ namespace MQTTnet.Extensions.ManagedClient TimeSpan AutoReconnectDelay { get; } + TimeSpan ConnectionCheckInterval { get; } + IManagedMqttClientStorage Storage { get; } } } \ No newline at end of file diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index bcd98ae..2eabce3 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -206,7 +206,7 @@ namespace MQTTnet.Extensions.ManagedClient if (connectionState == ReconnectionResult.StillConnected) { - await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); + await Task.Delay(_options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false); } } catch (OperationCanceledException) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptions.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptions.cs index e07e2d6..cecf227 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptions.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientOptions.cs @@ -9,6 +9,8 @@ namespace MQTTnet.Extensions.ManagedClient public TimeSpan AutoReconnectDelay { get; set; } = TimeSpan.FromSeconds(5); + public TimeSpan ConnectionCheckInterval { get; set; } = TimeSpan.FromSeconds(1); + public IManagedMqttClientStorage Storage { get; set; } } } From cd257b4d252a396f7801f55afc91db3bcea6f190 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Thu, 5 Jul 2018 20:31:01 +0200 Subject: [PATCH 10/19] hide overload that does not provide options --- Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs b/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs index 8b65bbe..b5d0440 100644 --- a/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs +++ b/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs @@ -44,7 +44,7 @@ namespace MQTTnet.AspNetCore return services; } - public static IServiceCollection AddHostedMqttServer(this IServiceCollection services) + private static IServiceCollection AddHostedMqttServer(this IServiceCollection services) { var logger = new MqttNetLogger(); var childLogger = logger.CreateChildLogger(); From 2f10295022cade8326b966a5d7cf81ca1e292e93 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 5 Jul 2018 21:20:01 +0200 Subject: [PATCH 11/19] Fix message handling for UWP. --- Source/MQTTnet/Adapter/MqttChannelAdapter.cs | 17 +++-- Source/MQTTnet/Serializer/MqttPacketReader.cs | 74 ++++++++++++++++--- Source/MQTTnet/Serializer/MqttPacketWriter.cs | 2 - 3 files changed, 75 insertions(+), 18 deletions(-) diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index 7396515..f909db1 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -97,13 +97,11 @@ namespace MQTTnet.Adapter await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { - _logger.Verbose("TX >>> {0}", packet); - var packetData = PacketSerializer.Serialize(packet); - await _channel.WriteAsync(packetData.Array, packetData.Offset, packetData.Count, cancellationToken).ConfigureAwait(false); - PacketSerializer.FreeBuffer(); + + _logger.Verbose("TX >>> {0}", packet); } catch (Exception exception) { @@ -149,7 +147,7 @@ namespace MQTTnet.Adapter } _logger.Verbose("RX <<< {0}", packet); - + return packet; } catch (Exception exception) @@ -190,9 +188,16 @@ namespace MQTTnet.Adapter chunkSize = bytesLeft; } +#if WINDOWS_UWP + var readBytes = await channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false); +#else // async/await is not used to avoid the overhead of context switches. We assume that the reamining data // has been sent from the sender directly after the initial bytes. - var readBytes = channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).GetAwaiter().GetResult(); + var readBytes = channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult(); +#endif + + cancellationToken.ThrowIfCancellationRequested(); + if (readBytes <= 0) { ExceptionHelper.ThrowGracefulSocketClose(); diff --git a/Source/MQTTnet/Serializer/MqttPacketReader.cs b/Source/MQTTnet/Serializer/MqttPacketReader.cs index ff8b54b..73e8a66 100644 --- a/Source/MQTTnet/Serializer/MqttPacketReader.cs +++ b/Source/MQTTnet/Serializer/MqttPacketReader.cs @@ -34,50 +34,104 @@ namespace MQTTnet.Serializer { return new MqttFixedHeader(buffer[0], 0); } - + +#if WINDOWS_UWP + // UWP will have a dead lock when calling this not async. + var bodyLength = await ReadBodyLengthAsync(channel, buffer[1], singleByteBuffer, cancellationToken).ConfigureAwait(false); +#else + // Here the async/await pattern is not used becuase the overhead of context switches + // is too big for reading 1 byte in a row. We expect that the remaining data was sent + // directly after the initial bytes. If the client disconnects just in this moment we + // will get an exception anyway. var bodyLength = ReadBodyLength(channel, buffer[1], singleByteBuffer, cancellationToken); +#endif + return new MqttFixedHeader(buffer[0], bodyLength); } +#if !WINDOWS_UWP private static int ReadBodyLength(IMqttChannel channel, byte initialEncodedByte, byte[] singleByteBuffer, CancellationToken cancellationToken) { - // Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html. + var offset = 0; var multiplier = 128; var value = initialEncodedByte & 127; int encodedByte = initialEncodedByte; while ((encodedByte & 128) != 0) { + offset++; + if (offset > 3) + { + throw new MqttProtocolViolationException("Remaining length is invalid."); + } + cancellationToken.ThrowIfCancellationRequested(); - // Here the async/await pattern is not used becuase the overhead of context switches - // is too big for reading 1 byte in a row. We expect that the remaining data was sent - // directly after the initial bytes. If the client disconnects just in this moment we - // will get an exception anyway. encodedByte = ReadByte(channel, singleByteBuffer, cancellationToken); value += (byte)(encodedByte & 127) * multiplier; - if (multiplier > 128 * 128 * 128) + multiplier *= 128; + } + + return value; + } + + private static byte ReadByte(IMqttChannel channel, byte[] singleByteBuffer, CancellationToken cancellationToken) + { + var readCount = channel.ReadAsync(singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult(); + + cancellationToken.ThrowIfCancellationRequested(); + + if (readCount <= 0) + { + ExceptionHelper.ThrowGracefulSocketClose(); + } + + return singleByteBuffer[0]; + } + +#else + + private static async Task ReadBodyLengthAsync(IMqttChannel channel, byte initialEncodedByte, byte[] singleByteBuffer, CancellationToken cancellationToken) + { + var offset = 0; + var multiplier = 128; + var value = initialEncodedByte & 127; + int encodedByte = initialEncodedByte; + + while ((encodedByte & 128) != 0) + { + offset++; + if (offset > 3) { throw new MqttProtocolViolationException("Remaining length is invalid."); } + cancellationToken.ThrowIfCancellationRequested(); + + encodedByte = await ReadByteAsync(channel, singleByteBuffer, cancellationToken).ConfigureAwait(false); + + value += (byte)(encodedByte & 127) * multiplier; multiplier *= 128; } return value; } - private static byte ReadByte(IMqttChannel channel, byte[] singleByteBuffer, CancellationToken cancellationToken) + private static async Task ReadByteAsync(IMqttChannel channel, byte[] singleByteBuffer, CancellationToken cancellationToken) { - var readCount = channel.ReadAsync(singleByteBuffer, 0, 1, cancellationToken).GetAwaiter().GetResult(); + var readCount = await channel.ReadAsync(singleByteBuffer, 0, 1, cancellationToken).ConfigureAwait(false); + + cancellationToken.ThrowIfCancellationRequested(); + if (readCount <= 0) { - cancellationToken.ThrowIfCancellationRequested(); ExceptionHelper.ThrowGracefulSocketClose(); } return singleByteBuffer[0]; } + +#endif } } diff --git a/Source/MQTTnet/Serializer/MqttPacketWriter.cs b/Source/MQTTnet/Serializer/MqttPacketWriter.cs index c7f1bc2..67fd467 100644 --- a/Source/MQTTnet/Serializer/MqttPacketWriter.cs +++ b/Source/MQTTnet/Serializer/MqttPacketWriter.cs @@ -29,7 +29,6 @@ namespace MQTTnet.Serializer public static ArraySegment EncodeRemainingLength(int length) { - // write the encoded remaining length right aligned on the 4 byte buffer if (length <= 0) { return new ArraySegment(new byte[1], 0, 1); @@ -38,7 +37,6 @@ namespace MQTTnet.Serializer var buffer = new byte[4]; var bufferOffset = 0; - // Algorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. var x = length; do { From 8035c6859f4b5ac614b81dd443fe733508bfaa20 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 5 Jul 2018 21:20:08 +0200 Subject: [PATCH 12/19] Update docs. --- Build/MQTTnet.nuspec | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 3d3375c..52acf56 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -10,7 +10,11 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - + * [Core] Performance optimizations. +* [Core] Fixed a bug which prevents receiving large packets (UWP only) +* [Client] The ManagedClient options now allow configuring the interval for connection checks. +* [Server] Added the Endpoint of the Adapter (remote IP and port) to the connection validation callback. +* [Server] The ipv4 and ipv6 endpoint can be disabled now by setting the bound IP address to _None_. Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin From bc6f155f125aa6266b8f9776568045a9cc267cb7 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sat, 7 Jul 2018 10:42:33 +0200 Subject: [PATCH 13/19] using connectionhandlers with websockets --- .../ApplicationBuilderExtensions.cs | 14 ++++++++++---- Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs | 9 +++++++-- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs b/Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs index 22b9e93..2125c7d 100644 --- a/Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs +++ b/Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs @@ -3,6 +3,7 @@ using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; using System.Linq; using MQTTnet.Server; +using System.Collections.Generic; namespace MQTTnet.AspNetCore { @@ -23,10 +24,7 @@ namespace MQTTnet.AspNetCore if (context.Request.Headers.TryGetValue("Sec-WebSocket-Protocol", out var requestedSubProtocolValues)) { - // Order the protocols to also match "mqtt", "mqttv-3.1", "mqttv-3.11" etc. - subProtocol = requestedSubProtocolValues - .OrderByDescending(p => p.Length) - .FirstOrDefault(p => p.ToLower().StartsWith("mqtt")); + subProtocol = SelectSubProtocol(requestedSubProtocolValues); } var adapter = app.ApplicationServices.GetRequiredService(); @@ -40,6 +38,14 @@ namespace MQTTnet.AspNetCore return app; } + public static string SelectSubProtocol(IList requestedSubProtocolValues) + { + // Order the protocols to also match "mqtt", "mqttv-3.1", "mqttv-3.11" etc. + return requestedSubProtocolValues + .OrderByDescending(p => p.Length) + .FirstOrDefault(p => p.ToLower().StartsWith("mqtt")); + } + public static IApplicationBuilder UseMqttServer(this IApplicationBuilder app, Action configure) { var server = app.ApplicationServices.GetRequiredService(); diff --git a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs index f52040a..db352f3 100644 --- a/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs +++ b/Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs @@ -22,13 +22,18 @@ namespace MQTTnet.TestApp.AspNetCore2 .Build(); services .AddHostedMqttServer(mqttServerOptions) - .AddMqttConnectionHandler(); + .AddMqttConnectionHandler() + .AddConnections(); } // In class _Startup_ of the ASP.NET Core 2.0 project. public void Configure(IApplicationBuilder app, IHostingEnvironment env) { - app.UseMqttEndpoint(); + app.UseConnections(c => c.MapConnectionHandler("/mqtt", options => { + options.WebSockets.SubProtocolSelector = MQTTnet.AspNetCore.ApplicationBuilderExtensions.SelectSubProtocol; + })); + + //app.UseMqttEndpoint(); app.UseMqttServer(server => { server.Started += async (sender, args) => From 978694b3c05dd6dff239e6e760056a918829a7b9 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sat, 7 Jul 2018 10:52:24 +0200 Subject: [PATCH 14/19] set binary mode --- Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs b/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs index 49a5c09..2390389 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs @@ -1,4 +1,5 @@ using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Connections.Features; using MQTTnet.Adapter; using MQTTnet.Serializer; using MQTTnet.Server; @@ -13,6 +14,14 @@ namespace MQTTnet.AspNetCore public override async Task OnConnectedAsync(ConnectionContext connection) { + // required for websocket transport to work + var transferFormatFeature = connection.Features.Get(); + if (transferFormatFeature != null) + { + transferFormatFeature.ActiveFormat = TransferFormat.Binary; + } + + var serializer = new MqttPacketSerializer(); using (var adapter = new MqttConnectionContext(serializer, connection)) { From 9efee7af21ec08fe4881c07a0175c49bb69e5341 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 7 Jul 2018 21:03:31 +0200 Subject: [PATCH 15/19] Refactoring. --- Source/MQTTnet/Adapter/MqttChannelAdapter.cs | 18 +++++++----------- Source/MQTTnet/Internal/ExceptionHelper.cs | 8 ++++++++ Source/MQTTnet/Serializer/MqttPacketReader.cs | 8 +++----- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index f909db1..4e61733 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -128,11 +128,11 @@ namespace MQTTnet.Adapter if (timeout > TimeSpan.Zero) { - receivedMqttPacket = await Internal.TaskExtensions.TimeoutAfterAsync(ct => ReceiveAsync(_channel, ct), timeout, cancellationToken).ConfigureAwait(false); + receivedMqttPacket = await Internal.TaskExtensions.TimeoutAfterAsync(ReceiveAsync, timeout, cancellationToken).ConfigureAwait(false); } else { - receivedMqttPacket = await ReceiveAsync(_channel, cancellationToken).ConfigureAwait(false); + receivedMqttPacket = await ReceiveAsync(cancellationToken).ConfigureAwait(false); } if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested) @@ -163,9 +163,9 @@ namespace MQTTnet.Adapter return null; } - private async Task ReceiveAsync(IMqttChannel channel, CancellationToken cancellationToken) + private async Task ReceiveAsync(CancellationToken cancellationToken) { - var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(channel, _fixedHeaderBuffer, _singleByteBuffer, cancellationToken).ConfigureAwait(false); + var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(_channel, _fixedHeaderBuffer, _singleByteBuffer, cancellationToken).ConfigureAwait(false); try { @@ -189,19 +189,15 @@ namespace MQTTnet.Adapter } #if WINDOWS_UWP - var readBytes = await channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false); + var readBytes = await _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false); #else // async/await is not used to avoid the overhead of context switches. We assume that the reamining data // has been sent from the sender directly after the initial bytes. - var readBytes = channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult(); + var readBytes = _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult(); #endif cancellationToken.ThrowIfCancellationRequested(); - - if (readBytes <= 0) - { - ExceptionHelper.ThrowGracefulSocketClose(); - } + ExceptionHelper.ThrowIfGracefulSocketClose(readBytes); bodyOffset += readBytes; } while (bodyOffset < body.Length); diff --git a/Source/MQTTnet/Internal/ExceptionHelper.cs b/Source/MQTTnet/Internal/ExceptionHelper.cs index f3cadc9..5bc8e43 100644 --- a/Source/MQTTnet/Internal/ExceptionHelper.cs +++ b/Source/MQTTnet/Internal/ExceptionHelper.cs @@ -8,5 +8,13 @@ namespace MQTTnet.Internal { throw new MqttCommunicationClosedGracefullyException(); } + + public static void ThrowIfGracefulSocketClose(int readBytesCount) + { + if (readBytesCount <= 0) + { + throw new MqttCommunicationClosedGracefullyException(); + } + } } } diff --git a/Source/MQTTnet/Serializer/MqttPacketReader.cs b/Source/MQTTnet/Serializer/MqttPacketReader.cs index 73e8a66..f115b50 100644 --- a/Source/MQTTnet/Serializer/MqttPacketReader.cs +++ b/Source/MQTTnet/Serializer/MqttPacketReader.cs @@ -20,11 +20,9 @@ namespace MQTTnet.Serializer while (totalBytesRead < buffer.Length) { var bytesRead = await channel.ReadAsync(buffer, totalBytesRead, buffer.Length - totalBytesRead, cancellationToken).ConfigureAwait(false); - if (bytesRead <= 0) - { - cancellationToken.ThrowIfCancellationRequested(); - ExceptionHelper.ThrowGracefulSocketClose(); - } + + cancellationToken.ThrowIfCancellationRequested(); + ExceptionHelper.ThrowIfGracefulSocketClose(bytesRead); totalBytesRead += bytesRead; } From acbf1226ab896e18155bb49de949e3723a58ff43 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 7 Jul 2018 21:22:08 +0200 Subject: [PATCH 16/19] Fix a bug in the keep alive monitor. --- Build/MQTTnet.nuspec | 1 + Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs | 10 ++++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 52acf56..34e563b 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -15,6 +15,7 @@ * [Client] The ManagedClient options now allow configuring the interval for connection checks. * [Server] Added the Endpoint of the Adapter (remote IP and port) to the connection validation callback. * [Server] The ipv4 and ipv6 endpoint can be disabled now by setting the bound IP address to _None_. +* [Server] Fix a bug in the keep alive monitor which caused high CPU load (thanks to @GarageGadget). Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs index b362861..8afb7ac 100644 --- a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs +++ b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs @@ -70,7 +70,9 @@ namespace MQTTnet.Server while (!cancellationToken.IsCancellationRequested) { // Values described here: [MQTT-3.1.2-24]. - if (!_isPaused && _lastPacketReceivedTracker.Elapsed.TotalSeconds > keepAlivePeriod * 1.5D) + // If the client sends 5 sec. the server will allow up to 7.5 seconds. + // If the client sends 1 sec. the server will allow up to 1.5 seconds. + if (!_isPaused && _lastPacketReceivedTracker.Elapsed.TotalSeconds >= keepAlivePeriod * 1.5D) { _logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientSession.ClientId); _clientSession.Stop(MqttClientDisconnectType.NotClean); @@ -78,7 +80,11 @@ namespace MQTTnet.Server return; } - await Task.Delay(keepAlivePeriod, cancellationToken).ConfigureAwait(false); + // The server checks the keep alive timeout every 50 % of the overall keep alive timeout + // because the server allows 1.5 times the keep alive value. This means that a value of 5 allows + // up to 7.5 seconds. With an interval of 2.5 (5 / 2) the 7.5 is also affected. Waiting the whole + // keep alive time will hit at 10 instead of 7.5 (but only one time instead of two times). + await Task.Delay(TimeSpan.FromSeconds(keepAlivePeriod * 0.5D), cancellationToken).ConfigureAwait(false); } } catch (OperationCanceledException) From dee0adcd8c61436cf2f88bced371ec9256ca3af3 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 7 Jul 2018 22:46:07 +0200 Subject: [PATCH 17/19] Add new overloads. --- Source/MQTTnet/MqttFactory.cs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/Source/MQTTnet/MqttFactory.cs b/Source/MQTTnet/MqttFactory.cs index e830ad2..3f1e1ac 100644 --- a/Source/MQTTnet/MqttFactory.cs +++ b/Source/MQTTnet/MqttFactory.cs @@ -22,12 +22,19 @@ namespace MQTTnet return new MqttClient(new MqttClientAdapterFactory(), logger); } - public IMqttClient CreateMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory mqttClientAdapterFactory) + public IMqttClient CreateMqttClient(IMqttClientAdapterFactory adapterFactory) + { + if (adapterFactory == null) throw new ArgumentNullException(nameof(adapterFactory)); + + return new MqttClient(adapterFactory, new MqttNetLogger()); + } + + public IMqttClient CreateMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory adapterFactory) { if (logger == null) throw new ArgumentNullException(nameof(logger)); - if (mqttClientAdapterFactory == null) throw new ArgumentNullException(nameof(mqttClientAdapterFactory)); + if (adapterFactory == null) throw new ArgumentNullException(nameof(adapterFactory)); - return new MqttClient(mqttClientAdapterFactory, logger); + return new MqttClient(adapterFactory, logger); } public IMqttServer CreateMqttServer() From 16ad07cfc93a8d8db553e083c7fcdb31c11c4aab Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 8 Jul 2018 10:34:46 +0200 Subject: [PATCH 18/19] Add wrapper for WebSocket4Net. --- .../WebSocket4NetMqttClientAdapterFactory.cs | 134 ++++++++++++++++++ .../PublicBrokerTest.cs | 7 +- 2 files changed, 138 insertions(+), 3 deletions(-) create mode 100644 Source/MQTTnet.Extensions.Wrappers.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs diff --git a/Source/MQTTnet.Extensions.Wrappers.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs b/Source/MQTTnet.Extensions.Wrappers.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs new file mode 100644 index 0000000..ed29be8 --- /dev/null +++ b/Source/MQTTnet.Extensions.Wrappers.WebSocket4Net/WebSocket4NetMqttClientAdapterFactory.cs @@ -0,0 +1,134 @@ +using MQTTnet.Client; +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Security.Authentication; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Adapter; +using MQTTnet.Channel; +using MQTTnet.Diagnostics; +using MQTTnet.Serializer; +using WebSocket4Net; + +namespace MQTTnet.TestApp.NetCore +{ + public class WebSocket4NetMqttClientAdapterFactory : IMqttClientAdapterFactory + { + public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetChildLogger logger) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + if (logger == null) throw new ArgumentNullException(nameof(logger)); + + if (!(options.ChannelOptions is MqttClientWebSocketOptions)) + { + throw new NotSupportedException("Only WebSocket connections are supported."); + } + + return new MqttChannelAdapter(new WebSocket4NetMqttChannel(options), new MqttPacketSerializer(), logger); + } + + private class WebSocket4NetMqttChannel : IMqttChannel + { + private readonly BlockingCollection _receiveBuffer = new BlockingCollection(); + + private readonly IMqttClientOptions _clientOptions; + private WebSocket4Net.WebSocket _webSocket; + + public WebSocket4NetMqttChannel(IMqttClientOptions clientOptions) + { + _clientOptions = clientOptions ?? throw new ArgumentNullException(nameof(clientOptions)); + } + + public string Endpoint { get; } = ""; + + public Task ConnectAsync(CancellationToken cancellationToken) + { + var channelOptions = (MqttClientWebSocketOptions)_clientOptions.ChannelOptions; + + var uri = "ws://" + channelOptions.Uri; + var sslProtocols = SslProtocols.None; + + if (channelOptions.TlsOptions.UseTls) + { + uri = "wss://" + channelOptions.Uri; + sslProtocols = SslProtocols.Tls12; + } + + var subProtocol = channelOptions.SubProtocols.FirstOrDefault() ?? string.Empty; + + _webSocket = new WebSocket4Net.WebSocket(uri, subProtocol, sslProtocols: sslProtocols); + _webSocket.DataReceived += OnDataReceived; + _webSocket.Open(); + SpinWait.SpinUntil(() => _webSocket.State == WebSocketState.Open, _clientOptions.CommunicationTimeout); + + return Task.FromResult(0); + } + + public Task DisconnectAsync() + { + if (_webSocket != null) + { + _webSocket.DataReceived -= OnDataReceived; + _webSocket.Close(); + SpinWait.SpinUntil(() => _webSocket.State == WebSocketState.Closed, _clientOptions.CommunicationTimeout); + } + + _webSocket = null; + + return Task.FromResult(0); + } + + public Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + var readBytes = 0; + while (count > 0 && !cancellationToken.IsCancellationRequested) + { + byte @byte; + if (readBytes == 0) + { + // Block until at lease one byte was received. + @byte = _receiveBuffer.Take(cancellationToken); + } + else + { + if (!_receiveBuffer.TryTake(out @byte)) + { + return Task.FromResult(readBytes); + } + } + + buffer[offset] = @byte; + offset++; + count--; + readBytes++; + } + + return Task.FromResult(readBytes); + } + + public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + _webSocket.Send(buffer, offset, count); + return Task.FromResult(0); + } + + public void Dispose() + { + if (_webSocket != null) + { + _webSocket.DataReceived -= OnDataReceived; + _webSocket.Dispose(); + } + } + + private void OnDataReceived(object sender, WebSocket4Net.DataReceivedEventArgs e) + { + foreach (var @byte in e.Data) + { + _receiveBuffer.Add(@byte); + } + } + } + } +} diff --git a/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs b/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs index 1265a6a..e46ed16 100644 --- a/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs @@ -3,6 +3,7 @@ using System; using System.IO; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Diagnostics; using MQTTnet.Protocol; using Newtonsoft.Json; @@ -12,7 +13,7 @@ namespace MQTTnet.TestApp.NetCore { public static async Task RunAsync() { - //MqttNetGlobalLogger.LogMessagePublished += (s, e) => Console.WriteLine(e.TraceMessage); + //MqttNetConsoleLogger.ForwardToConsole(); // iot.eclipse.org await ExecuteTestAsync("iot.eclipse.org TCP", @@ -35,7 +36,7 @@ namespace MQTTnet.TestApp.NetCore new MqttClientOptionsBuilder().WithWebSocketServer("test.mosquitto.org:8080/mqtt").Build()); await ExecuteTestAsync("test.mosquitto.org WS TLS", - new MqttClientOptionsBuilder().WithWebSocketServer("test.mosquitto.org:8081/mqtt").Build()); + new MqttClientOptionsBuilder().WithWebSocketServer("test.mosquitto.org:8081/mqtt").WithTls().Build()); // broker.hivemq.com await ExecuteTestAsync("broker.hivemq.com TCP", @@ -78,7 +79,7 @@ namespace MQTTnet.TestApp.NetCore var topic = Guid.NewGuid().ToString(); MqttApplicationMessage receivedMessage = null; - client.ApplicationMessageReceived += (s, e) => receivedMessage = e.ApplicationMessage; + client.ApplicationMessageReceived += (s, e) => receivedMessage = e.ApplicationMessage; await client.ConnectAsync(options); await client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce); From 54e3cd7cb722d51bffcbb0e2c7ea0cb1f630539a Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 8 Jul 2018 14:01:47 +0200 Subject: [PATCH 19/19] Update nuspecs. --- Build/MQTTnet.AspNetCore.nuspec | 5 +++-- Build/MQTTnet.Extensions.ManagedClient.nuspec | 5 +++-- Build/MQTTnet.Extensions.Rpc.nuspec | 5 +++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/Build/MQTTnet.AspNetCore.nuspec b/Build/MQTTnet.AspNetCore.nuspec index c9936f3..43f056c 100644 --- a/Build/MQTTnet.AspNetCore.nuspec +++ b/Build/MQTTnet.AspNetCore.nuspec @@ -10,12 +10,13 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false This is a support library to integrate MQTTnet into AspNetCore. - * Updated to MQTTnet 2.8.0. + * Updated to MQTTnet 2.8.1. +* For more release notes please check the MQTTnet release notes. Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.Extensions.ManagedClient.nuspec b/Build/MQTTnet.Extensions.ManagedClient.nuspec index ba5333f..44114a5 100644 --- a/Build/MQTTnet.Extensions.ManagedClient.nuspec +++ b/Build/MQTTnet.Extensions.ManagedClient.nuspec @@ -10,12 +10,13 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false This is an extension library which provides a managed MQTT client with additional features using MQTTnet. - * Updated to MQTTnet 2.8.0. + * Updated to MQTTnet 2.8.1. +* For more release notes please check the MQTTnet release notes. Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - + diff --git a/Build/MQTTnet.Extensions.Rpc.nuspec b/Build/MQTTnet.Extensions.Rpc.nuspec index 0df8e7e..1c8bee7 100644 --- a/Build/MQTTnet.Extensions.Rpc.nuspec +++ b/Build/MQTTnet.Extensions.Rpc.nuspec @@ -10,12 +10,13 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false This is an extension library which allows executing synchronous device calls including a response using MQTTnet. - * Updated to MQTTnet 2.8.0. + * Updated to MQTTnet 2.8.1. +* For more release notes please check the MQTTnet release notes. Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - +