From f76e13028316e3d1757d871bb9f200945a72583d Mon Sep 17 00:00:00 2001 From: vbBerni Date: Wed, 19 Jun 2019 16:33:54 +0200 Subject: [PATCH 01/11] fix for failed publishing messages get removed from the message queue but not from the storage manager --- Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 59f1c9a..b27bead 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -408,6 +408,7 @@ namespace MQTTnet.Extensions.ManagedClient { _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); } + _storageManager?.RemoveAsync(message).GetAwaiter().GetResult(); } } catch (Exception exception) From a9017b87959f60d9070cea4a9d8bed9de3a7088d Mon Sep 17 00:00:00 2001 From: vbBerni Date: Sat, 22 Jun 2019 22:31:48 +0200 Subject: [PATCH 02/11] handle storage manager null --- .../ManagedMqttClient.cs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index b27bead..dc57e40 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -387,7 +387,10 @@ namespace MQTTnet.Extensions.ManagedClient //removed it, in which case we don't want to do anything. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); } - _storageManager?.RemoveAsync(message).GetAwaiter().GetResult(); + if (_storageManager != null) + { + _storageManager.RemoveAsync(message).GetAwaiter().GetResult(); + } } catch (MqttCommunicationException exception) { @@ -408,7 +411,10 @@ namespace MQTTnet.Extensions.ManagedClient { _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); } - _storageManager?.RemoveAsync(message).GetAwaiter().GetResult(); + if (_storageManager != null) + { + _storageManager.RemoveAsync(message).GetAwaiter().GetResult(); + } } } catch (Exception exception) From 961a497798fcc0370adcb43eae863892262622a6 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 23 Jun 2019 11:32:09 +0200 Subject: [PATCH 03/11] Performance refactoring. --- .../SpanBasedMqttPacketBodyReader.cs | 9 +- .../Formatter/IMqttPacketBodyReader.cs | 6 +- .../MQTTnet/Formatter/MqttPacketBodyReader.cs | 92 ++++++++++--------- .../Formatter/V5/MqttV500PropertiesReader.cs | 6 +- .../MQTTnet.Benchmarks/SerializerBenchmark.cs | 2 +- Tests/MQTTnet.Core.Tests/Protocol_Tests.cs | 23 +++++ 6 files changed, 82 insertions(+), 56 deletions(-) create mode 100644 Tests/MQTTnet.Core.Tests/Protocol_Tests.cs diff --git a/Source/MQTTnet.AspnetCore/SpanBasedMqttPacketBodyReader.cs b/Source/MQTTnet.AspnetCore/SpanBasedMqttPacketBodyReader.cs index 2b118f7..58edcf8 100644 --- a/Source/MQTTnet.AspnetCore/SpanBasedMqttPacketBodyReader.cs +++ b/Source/MQTTnet.AspnetCore/SpanBasedMqttPacketBodyReader.cs @@ -9,6 +9,7 @@ namespace MQTTnet.AspNetCore public class SpanBasedMqttPacketBodyReader : IMqttPacketBodyReader { private ReadOnlyMemory _buffer; + private int _offset; public void SetBuffer(ReadOnlyMemory buffer) @@ -17,11 +18,11 @@ namespace MQTTnet.AspNetCore _offset = 0; } - public ulong Length => (ulong)_buffer.Length; + public int Length => _buffer.Length; public bool EndOfStream => _buffer.Length.Equals(_offset); - public ulong Offset => (ulong)_offset; + public int Offset => _offset; public byte ReadByte() { @@ -116,9 +117,9 @@ namespace MQTTnet.AspNetCore throw new MqttProtocolViolationException("Boolean values can be 0 or 1 only."); } - public void Seek(ulong position) + public void Seek(int position) { - _offset = (int)position; + _offset = position; } } } diff --git a/Source/MQTTnet/Formatter/IMqttPacketBodyReader.cs b/Source/MQTTnet/Formatter/IMqttPacketBodyReader.cs index 632c493..2e0f8f0 100644 --- a/Source/MQTTnet/Formatter/IMqttPacketBodyReader.cs +++ b/Source/MQTTnet/Formatter/IMqttPacketBodyReader.cs @@ -4,9 +4,9 @@ namespace MQTTnet.Formatter { public interface IMqttPacketBodyReader { - ulong Length { get; } + int Length { get; } - ulong Offset { get; } + int Offset { get; } bool EndOfStream { get; } @@ -26,6 +26,6 @@ namespace MQTTnet.Formatter bool ReadBoolean(); - void Seek(ulong position); + void Seek(int position); } } diff --git a/Source/MQTTnet/Formatter/MqttPacketBodyReader.cs b/Source/MQTTnet/Formatter/MqttPacketBodyReader.cs index c45fb3d..dece6f3 100644 --- a/Source/MQTTnet/Formatter/MqttPacketBodyReader.cs +++ b/Source/MQTTnet/Formatter/MqttPacketBodyReader.cs @@ -9,52 +9,54 @@ namespace MQTTnet.Formatter public class MqttPacketBodyReader : IMqttPacketBodyReader { private readonly byte[] _buffer; - private readonly ulong _initialOffset; - private readonly ulong _length; + private readonly int _initialOffset; + private readonly int _length; - public MqttPacketBodyReader(byte[] buffer, int offset, int length) - : this(buffer, (ulong)offset, (ulong)length) - { - } + private int _offset; - public MqttPacketBodyReader(byte[] buffer, ulong offset, ulong length) + public MqttPacketBodyReader(byte[] buffer, int offset, int length) { _buffer = buffer; _initialOffset = offset; - Offset = offset; + _offset = offset; _length = length; } - public ulong Offset { get; private set; } + public int Offset + { + get => _offset; + } - public ulong Length => _length - Offset; + public int Length => _length - _offset; - public bool EndOfStream => Offset == _length; + public bool EndOfStream => _offset == _length; - public void Seek(ulong position) + public void Seek(int position) { - Offset = _initialOffset + position; + _offset = _initialOffset + position; } - public ArraySegment Read(uint length) + public ArraySegment Read(int length) { ValidateReceiveBuffer(length); - var buffer = new ArraySegment(_buffer, (int)Offset, (int)length); - Offset += length; + var buffer = new ArraySegment(_buffer, (int)_offset, (int)length); + _offset += length; return buffer; } public byte ReadByte() { ValidateReceiveBuffer(1); - return _buffer[Offset++]; + + return _buffer[_offset++]; } public bool ReadBoolean() { ValidateReceiveBuffer(1); - var buffer = _buffer[Offset++]; + + var buffer = _buffer[_offset++]; if (buffer == 0) { @@ -71,15 +73,15 @@ namespace MQTTnet.Formatter public byte[] ReadRemainingData() { - return new ArraySegment(_buffer, (int)Offset, (int)(_length - Offset)).ToArray(); + return new ArraySegment(_buffer, (int)_offset, (int)(_length - _offset)).ToArray(); } public ushort ReadTwoByteInteger() { ValidateReceiveBuffer(2); - var msb = _buffer[Offset++]; - var lsb = _buffer[Offset++]; + var msb = _buffer[_offset++]; + var lsb = _buffer[_offset++]; return (ushort)(msb << 8 | lsb); } @@ -88,31 +90,14 @@ namespace MQTTnet.Formatter { ValidateReceiveBuffer(4); - var byte0 = _buffer[Offset++]; - var byte1 = _buffer[Offset++]; - var byte2 = _buffer[Offset++]; - var byte3 = _buffer[Offset++]; + var byte0 = _buffer[_offset++]; + var byte1 = _buffer[_offset++]; + var byte2 = _buffer[_offset++]; + var byte3 = _buffer[_offset++]; return (uint)(byte0 << 24 | byte1 << 16 | byte2 << 8 | byte3); } - public byte[] ReadWithLengthPrefix() - { - return ReadSegmentWithLengthPrefix().ToArray(); - } - - private ArraySegment ReadSegmentWithLengthPrefix() - { - var length = ReadTwoByteInteger(); - - ValidateReceiveBuffer(length); - - var result = new ArraySegment(_buffer, (int)Offset, length); - Offset += length; - - return result; - } - public uint ReadVariableLengthInteger() { var multiplier = 1; @@ -134,13 +119,30 @@ namespace MQTTnet.Formatter return value; } + + public byte[] ReadWithLengthPrefix() + { + return ReadSegmentWithLengthPrefix().ToArray(); + } + + private ArraySegment ReadSegmentWithLengthPrefix() + { + var length = ReadTwoByteInteger(); + + ValidateReceiveBuffer(length); + + var result = new ArraySegment(_buffer, (int)_offset, length); + _offset += length; + + return result; + } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void ValidateReceiveBuffer(uint length) + private void ValidateReceiveBuffer(int length) { - if (_length < Offset + length) + if (_length < _offset + length) { - throw new ArgumentOutOfRangeException(nameof(_buffer), $"Expected at least {Offset + length} bytes but there are only {_length} bytes"); + throw new ArgumentOutOfRangeException(nameof(_buffer), $"Expected at least {_offset + length} bytes but there are only {_length} bytes"); } } diff --git a/Source/MQTTnet/Formatter/V5/MqttV500PropertiesReader.cs b/Source/MQTTnet/Formatter/V5/MqttV500PropertiesReader.cs index df323ae..443cf17 100644 --- a/Source/MQTTnet/Formatter/V5/MqttV500PropertiesReader.cs +++ b/Source/MQTTnet/Formatter/V5/MqttV500PropertiesReader.cs @@ -9,8 +9,8 @@ namespace MQTTnet.Formatter.V5 public class MqttV500PropertiesReader { private readonly IMqttPacketBodyReader _body; - private readonly uint _length; - private readonly ulong _targetOffset; + private readonly int _length; + private readonly int _targetOffset; public MqttV500PropertiesReader(IMqttPacketBodyReader body) { @@ -18,7 +18,7 @@ namespace MQTTnet.Formatter.V5 if (!body.EndOfStream) { - _length = body.ReadVariableLengthInteger(); + _length = (int)body.ReadVariableLengthInteger(); } _targetOffset = body.Offset + _length; diff --git a/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs b/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs index 7fbb552..51e7ecb 100644 --- a/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs @@ -57,7 +57,7 @@ namespace MQTTnet.Benchmarks var receivedPacket = new ReceivedMqttPacket( header.Flags, - new MqttPacketBodyReader(_serializedPacket.Array, (ulong)(_serializedPacket.Count - header.RemainingLength), (ulong)_serializedPacket.Array.Length), 0); + new MqttPacketBodyReader(_serializedPacket.Array, _serializedPacket.Count - header.RemainingLength, _serializedPacket.Array.Length), 0); _serializer.Decode(receivedPacket); } diff --git a/Tests/MQTTnet.Core.Tests/Protocol_Tests.cs b/Tests/MQTTnet.Core.Tests/Protocol_Tests.cs new file mode 100644 index 0000000..41cd1fa --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/Protocol_Tests.cs @@ -0,0 +1,23 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Formatter; + +namespace MQTTnet.Tests +{ + [TestClass] + public class Protocol_Tests + { + [TestMethod] + public void Encode_Four_Byte_Integer() + { + for (uint i = 0; i < 268435455; i++) + { + var buffer = MqttPacketWriter.EncodeVariableLengthInteger(i); + var reader = new MqttPacketBodyReader(buffer.Array, buffer.Offset, buffer.Count); + + var checkValue = reader.ReadVariableLengthInteger(); + + Assert.AreEqual(i, checkValue); + } + } + } +} From c50fb740fc71b7921b3a56e1eb8346da78dffdf0 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 23 Jun 2019 12:37:33 +0200 Subject: [PATCH 04/11] Add UnitTests. --- Tests/MQTTnet.Core.Tests/Server_Tests.cs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs index 31f0847..d10ebaf 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs @@ -22,6 +22,27 @@ namespace MQTTnet.Tests [TestClass] public class Server_Tests { + [TestMethod] + public async Task Use_Empty_Client_ID() + { + using (var testEnvironment = new TestEnvironment()) + { + await testEnvironment.StartServerAsync(); + + var client = testEnvironment.CreateClient(); + + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer("localhost", testEnvironment.ServerPort) + .WithClientId(string.Empty) + .Build(); + + var connectResult = await client.ConnectAsync(clientOptions); + + Assert.IsFalse(connectResult.IsSessionPresent); + Assert.IsTrue(client.IsConnected); + } + } + [TestMethod] public async Task Publish_At_Most_Once_0x00() { From 7b3be3a64f532b73607f92a556e4bf0fd1982457 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 23 Jun 2019 12:37:41 +0200 Subject: [PATCH 05/11] Update docs. --- Build/MQTTnet.nuspec | 1 + 1 file changed, 1 insertion(+) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index f0e4c3d..3735636 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -15,6 +15,7 @@ * [Core] Added extension method to allow usage of _WebSocket4Net_ in clients to fix issues with AWS and Xamarin. * [Core] Fixed usage of wrong data type for passwords (string -> byte[]). * [Core] Fixed an _ObjectDisposedException_ when sending data using a WebSocket channel. +* [Core] Performance optimizations. * [Client] Added support for extended authentication exchange. * [Client] Exposed MQTTv5 CONNACK values to client. * [Client] Added _MqttClientSubscribeOptionsBuilder_. From 985c3084a88c014acc576680ec13ce6b30255c48 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 23 Jun 2019 12:37:55 +0200 Subject: [PATCH 06/11] Performance refactoring. --- .../MQTTnet/Formatter/MqttPacketBodyReader.cs | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/Source/MQTTnet/Formatter/MqttPacketBodyReader.cs b/Source/MQTTnet/Formatter/MqttPacketBodyReader.cs index dece6f3..1deb1fe 100644 --- a/Source/MQTTnet/Formatter/MqttPacketBodyReader.cs +++ b/Source/MQTTnet/Formatter/MqttPacketBodyReader.cs @@ -22,10 +22,7 @@ namespace MQTTnet.Formatter _length = length; } - public int Offset - { - get => _offset; - } + public int Offset => _offset; public int Length => _length - _offset; @@ -36,22 +33,13 @@ namespace MQTTnet.Formatter _offset = _initialOffset + position; } - public ArraySegment Read(int length) - { - ValidateReceiveBuffer(length); - - var buffer = new ArraySegment(_buffer, (int)_offset, (int)length); - _offset += length; - return buffer; - } - public byte ReadByte() { ValidateReceiveBuffer(1); return _buffer[_offset++]; } - + public bool ReadBoolean() { ValidateReceiveBuffer(1); @@ -73,7 +61,11 @@ namespace MQTTnet.Formatter public byte[] ReadRemainingData() { - return new ArraySegment(_buffer, (int)_offset, (int)(_length - _offset)).ToArray(); + var bufferLength = _length - _offset; + var buffer = new byte[bufferLength]; + Array.Copy(_buffer, _offset, buffer, 0, bufferLength); + + return buffer; } public ushort ReadTwoByteInteger() @@ -131,7 +123,7 @@ namespace MQTTnet.Formatter ValidateReceiveBuffer(length); - var result = new ArraySegment(_buffer, (int)_offset, length); + var result = new ArraySegment(_buffer, _offset, length); _offset += length; return result; From ce05cc9936d464dadf5f022d31090b411879e7cc Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 23 Jun 2019 15:39:07 +0200 Subject: [PATCH 07/11] Fix MQTTnet.Server connection validation. --- Source/MQTTnet.Server/Mqtt/MqttServerConnectionValidator.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet.Server/Mqtt/MqttServerConnectionValidator.cs b/Source/MQTTnet.Server/Mqtt/MqttServerConnectionValidator.cs index 4e13c83..3b1a2fc 100644 --- a/Source/MQTTnet.Server/Mqtt/MqttServerConnectionValidator.cs +++ b/Source/MQTTnet.Server/Mqtt/MqttServerConnectionValidator.cs @@ -29,10 +29,10 @@ namespace MQTTnet.Server.Mqtt { "client_id", context.ClientId }, { "username", context.Username }, { "password", context.Password }, - { "raw_password", new Bytes(context.RawPassword) }, + { "raw_password", new Bytes(context.RawPassword ?? new byte[0]) }, { "clean_session", context.CleanSession}, { "authentication_method", context.AuthenticationMethod}, - { "authentication_data", new Bytes(context.AuthenticationData)}, + { "authentication_data", new Bytes(context.AuthenticationData ?? new byte[0]) }, { "result", PythonConvert.Pythonfy(context.ReasonCode) } }; @@ -44,6 +44,8 @@ namespace MQTTnet.Server.Mqtt catch (Exception exception) { _logger.LogError(exception, "Error while validating client connection."); + + context.ReasonCode = MqttConnectReasonCode.UnspecifiedError; } return Task.CompletedTask; From 6f8272210c3765a5a57e187ac825573a5a25d458 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 23 Jun 2019 15:39:50 +0200 Subject: [PATCH 08/11] Fix MQTTnet.Server app settings to allow external connections. --- Source/MQTTnet.Server/appsettings.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet.Server/appsettings.json b/Source/MQTTnet.Server/appsettings.json index 9a9ce0d..8ea10d6 100644 --- a/Source/MQTTnet.Server/appsettings.json +++ b/Source/MQTTnet.Server/appsettings.json @@ -2,10 +2,10 @@ "Kestrel": { "EndPoints": { "Http": { - "Url": "http://localhost:80" + "Url": "http://*:80" }, "Https": { - "Url": "http://localhost:443" + "Url": "http://*:443" } } }, From 6ee3d1640fe4b36e8ffc9094497571f0b460e6f5 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 23 Jun 2019 15:57:12 +0200 Subject: [PATCH 09/11] Improve logging. --- Source/MQTTnet/Server/MqttClientConnection.cs | 5 ++--- Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs | 3 ++- Source/MQTTnet/Server/MqttClientSessionsManager.cs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index 5ede2c6..ed378bb 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -7,6 +7,7 @@ using MQTTnet.Client; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; using MQTTnet.Formatter; +using MQTTnet.Internal; using MQTTnet.PacketDispatcher; using MQTTnet.Packets; using MQTTnet.Protocol; @@ -134,9 +135,7 @@ namespace MQTTnet.Server Session.WillMessage = _connectPacket.WillMessage; -#pragma warning disable 4014 - Task.Run(() => SendPendingPacketsAsync(_cancellationToken.Token), _cancellationToken.Token); -#pragma warning restore 4014 + Task.Run(() => SendPendingPacketsAsync(_cancellationToken.Token), _cancellationToken.Token).Forget(_logger); // TODO: Change to single thread in SessionManager. Or use SessionManager and stats from KeepAliveMonitor. _keepAliveMonitor.Start(_connectPacket.KeepAlivePeriod, _cancellationToken.Token); diff --git a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs index 32effed..deefe4c 100644 --- a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs +++ b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs @@ -3,6 +3,7 @@ using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using MQTTnet.Diagnostics; +using MQTTnet.Internal; namespace MQTTnet.Server { @@ -32,7 +33,7 @@ namespace MQTTnet.Server return; } - Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken).ConfigureAwait(false); + Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken).Forget(_logger); } public void Pause() diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index b18f414..c2d5637 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -47,7 +47,7 @@ namespace MQTTnet.Server public void Start() { - Task.Run(() => TryProcessQueuedApplicationMessagesAsync(_cancellationToken), _cancellationToken); + Task.Run(() => TryProcessQueuedApplicationMessagesAsync(_cancellationToken), _cancellationToken).Forget(_logger); } public async Task StopAsync() From dd96cf98a87b576b7f83dab71ae7da407fcac816 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 23 Jun 2019 15:57:48 +0200 Subject: [PATCH 10/11] Make storage update for managed client async. --- .../ManagedMqttClient.cs | 57 +++++++++++-------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index dc57e40..74d148d 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -31,7 +31,7 @@ namespace MQTTnet.Extensions.ManagedClient private ManagedMqttClientStorageManager _storageManager; - private bool _disposed = false; + private bool _disposed; private bool _subscriptionsNotPushed; public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger) @@ -102,9 +102,8 @@ namespace MQTTnet.Extensions.ManagedClient _connectionCancellationToken = new CancellationTokenSource(); -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed _maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + _maintainConnectionTask.Forget(_logger); _logger.Info("Started"); } @@ -333,20 +332,20 @@ namespace MQTTnet.Extensions.ManagedClient } } - private void PublishQueuedMessages(CancellationToken cancellationToken) + private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) { try { while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected) { - //Peek at the message without dequeueing in order to prevent the - //possibility of the queue growing beyond the configured cap. - //Previously, messages could be re-enqueued if there was an - //exception, and this re-enqueueing did not honor the cap. - //Furthermore, because re-enqueueing would shuffle the order - //of the messages, the DropOldestQueuedMessage strategy would - //be unable to know which message is actually the oldest and would - //instead drop the first item in the queue. + // Peek at the message without dequeueing in order to prevent the + // possibility of the queue growing beyond the configured cap. + // Previously, messages could be re-enqueued if there was an + // exception, and this re-enqueueing did not honor the cap. + // Furthermore, because re-enqueueing would shuffle the order + // of the messages, the DropOldestQueuedMessage strategy would + // be unable to know which message is actually the oldest and would + // instead drop the first item in the queue. var message = _messageQueue.PeekAndWait(); if (message == null) { @@ -355,7 +354,7 @@ namespace MQTTnet.Extensions.ManagedClient cancellationToken.ThrowIfCancellationRequested(); - TryPublishQueuedMessage(message); + await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); } } catch (OperationCanceledException) @@ -371,25 +370,27 @@ namespace MQTTnet.Extensions.ManagedClient } } - private void TryPublishQueuedMessage(ManagedMqttApplicationMessage message) + private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message) { Exception transmitException = null; try { - _mqttClient.PublishAsync(message.ApplicationMessage).GetAwaiter().GetResult(); + await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false); + lock (_messageQueue) //lock to avoid conflict with this.PublishAsync { - //While publishing this message, this.PublishAsync could have booted this - //message off the queue to make room for another (when using a cap - //with the DropOldestQueuedMessage strategy). If the first item - //in the queue is equal to this message, then it's safe to remove - //it from the queue. If not, that means this.PublishAsync has already - //removed it, in which case we don't want to do anything. + // While publishing this message, this.PublishAsync could have booted this + // message off the queue to make room for another (when using a cap + // with the DropOldestQueuedMessage strategy). If the first item + // in the queue is equal to this message, then it's safe to remove + // it from the queue. If not, that means this.PublishAsync has already + // removed it, in which case we don't want to do anything. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); } + if (_storageManager != null) { - _storageManager.RemoveAsync(message).GetAwaiter().GetResult(); + await _storageManager.RemoveAsync(message).ConfigureAwait(false); } } catch (MqttCommunicationException exception) @@ -411,9 +412,10 @@ namespace MQTTnet.Extensions.ManagedClient { _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); } + if (_storageManager != null) { - _storageManager.RemoveAsync(message).GetAwaiter().GetResult(); + await _storageManager.RemoveAsync(message).ConfigureAwait(false); } } } @@ -424,7 +426,12 @@ namespace MQTTnet.Extensions.ManagedClient } finally { - ApplicationMessageProcessedHandler?.HandleApplicationMessageProcessedAsync(new ApplicationMessageProcessedEventArgs(message, transmitException)).GetAwaiter().GetResult(); + var eventHandler = ApplicationMessageProcessedHandler; + if (eventHandler != null) + { + var eventArguments = new ApplicationMessageProcessedEventArgs(message, transmitException); + await eventHandler.HandleApplicationMessageProcessedAsync(eventArguments).ConfigureAwait(false); + } } } @@ -509,7 +516,7 @@ namespace MQTTnet.Extensions.ManagedClient var cts = new CancellationTokenSource(); _publishingCancellationToken = cts; - Task.Factory.StartNew(() => PublishQueuedMessages(cts.Token), cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); + Task.Run(() => PublishQueuedMessagesAsync(cts.Token), cts.Token).Forget(_logger); } private void StopPublishing() From 7a767c7d2fb8ffbb9d537cfbf4e91ae33257adcb Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 23 Jun 2019 15:58:21 +0200 Subject: [PATCH 11/11] Improve logging and exception handling when stopping the server. --- .../MQTTnet.Server/Mqtt/MqttServerService.cs | 5 +- .../Implementations/MqttTcpServerAdapter.cs | 22 +++--- .../Implementations/MqttTcpServerListener.cs | 71 ++++++++++++++----- 3 files changed, 69 insertions(+), 29 deletions(-) diff --git a/Source/MQTTnet.Server/Mqtt/MqttServerService.cs b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs index 6323738..efb0010 100644 --- a/Source/MQTTnet.Server/Mqtt/MqttServerService.cs +++ b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs @@ -66,7 +66,10 @@ namespace MQTTnet.Server.Mqtt var adapters = new List { - new MqttTcpServerAdapter(mqttFactory.Logger.CreateChildLogger()), + new MqttTcpServerAdapter(mqttFactory.Logger.CreateChildLogger()) + { + TreatSocketOpeningErrorAsWarning = true // Opening other ports than for HTTP is not allows in Azure App Services. + }, _webSocketServerAdapter }; diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs index 900ba96..0e28ad0 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs @@ -28,6 +28,8 @@ namespace MQTTnet.Implementations public Func ClientHandler { get; set; } + public bool TreatSocketOpeningErrorAsWarning { get; set; } + public Task StartAsync(IMqttServerOptions options) { if (_cancellationTokenSource != null) throw new InvalidOperationException("Server is already started."); @@ -36,7 +38,7 @@ namespace MQTTnet.Implementations if (options.DefaultEndpointOptions.IsEnabled) { - RegisterListeners(options.DefaultEndpointOptions, null); + RegisterListeners(options.DefaultEndpointOptions, null, _cancellationTokenSource.Token); } if (options.TlsEndpointOptions.IsEnabled) @@ -52,7 +54,7 @@ namespace MQTTnet.Implementations throw new InvalidOperationException("The certificate for TLS encryption must contain the private key."); } - RegisterListeners(options.TlsEndpointOptions, tlsCertificate); + RegisterListeners(options.TlsEndpointOptions, tlsCertificate, _cancellationTokenSource.Token); } return Task.FromResult(0); @@ -78,7 +80,7 @@ namespace MQTTnet.Implementations _listeners.Clear(); } - private void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate) + private void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, CancellationToken cancellationToken) { if (!options.BoundInterNetworkAddress.Equals(IPAddress.None)) { @@ -86,14 +88,15 @@ namespace MQTTnet.Implementations AddressFamily.InterNetwork, options, tlsCertificate, - _cancellationTokenSource.Token, _logger) { ClientHandler = OnClientAcceptedAsync }; - listenerV4.Start(); - _listeners.Add(listenerV4); + if (listenerV4.Start(TreatSocketOpeningErrorAsWarning, cancellationToken)) + { + _listeners.Add(listenerV4); + } } if (!options.BoundInterNetworkV6Address.Equals(IPAddress.None)) @@ -102,14 +105,15 @@ namespace MQTTnet.Implementations AddressFamily.InterNetworkV6, options, tlsCertificate, - _cancellationTokenSource.Token, _logger) { ClientHandler = OnClientAcceptedAsync }; - listenerV6.Start(); - _listeners.Add(listenerV6); + if (listenerV6.Start(TreatSocketOpeningErrorAsWarning, cancellationToken)) + { + _listeners.Add(listenerV6); + } } } diff --git a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs index fe03c73..fac7376 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs @@ -10,6 +10,7 @@ using System.Threading.Tasks; using MQTTnet.Adapter; using MQTTnet.Diagnostics; using MQTTnet.Formatter; +using MQTTnet.Internal; using MQTTnet.Server; namespace MQTTnet.Implementations @@ -17,24 +18,23 @@ namespace MQTTnet.Implementations public class MqttTcpServerListener : IDisposable { private readonly IMqttNetChildLogger _logger; - private readonly CancellationToken _cancellationToken; private readonly AddressFamily _addressFamily; private readonly MqttServerTcpEndpointBaseOptions _options; private readonly MqttServerTlsTcpEndpointOptions _tlsOptions; private readonly X509Certificate2 _tlsCertificate; + private Socket _socket; + private IPEndPoint _localEndPoint; public MqttTcpServerListener( AddressFamily addressFamily, MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, - CancellationToken cancellationToken, IMqttNetChildLogger logger) { _addressFamily = addressFamily; _options = options; _tlsCertificate = tlsCertificate; - _cancellationToken = cancellationToken; _logger = logger.CreateChildLogger(nameof(MqttTcpServerListener)); if (_options is MqttServerTlsTcpEndpointOptions tlsOptions) @@ -45,21 +45,38 @@ namespace MQTTnet.Implementations public Func ClientHandler { get; set; } - public void Start() + public bool Start(bool treatErrorsAsWarning, CancellationToken cancellationToken) { - var boundIp = _options.BoundInterNetworkAddress; - if (_addressFamily == AddressFamily.InterNetworkV6) + try { - boundIp = _options.BoundInterNetworkV6Address; - } + var boundIp = _options.BoundInterNetworkAddress; + if (_addressFamily == AddressFamily.InterNetworkV6) + { + boundIp = _options.BoundInterNetworkV6Address; + } + + _localEndPoint = new IPEndPoint(boundIp, _options.Port); - _socket = new Socket(_addressFamily, SocketType.Stream, ProtocolType.Tcp); - _socket.Bind(new IPEndPoint(boundIp, _options.Port)); + _logger.Info($"Starting TCP listener for {_localEndPoint} TLS={_tlsCertificate != null}."); - _logger.Info($"Starting TCP listener for {_socket.LocalEndPoint} TLS={_tlsCertificate != null}."); + _socket = new Socket(_addressFamily, SocketType.Stream, ProtocolType.Tcp); + _socket.Bind(_localEndPoint); + _socket.Listen(_options.ConnectionBacklog); - _socket.Listen(_options.ConnectionBacklog); - Task.Run(() => AcceptClientConnectionsAsync(_cancellationToken), _cancellationToken); + Task.Run(() => AcceptClientConnectionsAsync(cancellationToken), cancellationToken).Forget(_logger); + + return true; + } + catch (Exception exception) + { + if (!treatErrorsAsWarning) + { + throw; + } + + _logger.Warning(exception,"Error while creating listener socket for local end point '{0}'.", _localEndPoint); + return false; + } } public void Dispose() @@ -82,13 +99,29 @@ namespace MQTTnet.Implementations #else var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false); #endif -#pragma warning disable 4014 - Task.Run(() => TryHandleClientConnectionAsync(clientSocket), cancellationToken); -#pragma warning restore 4014 + + if (clientSocket == null) + { + continue; + } + + Task.Run(() => TryHandleClientConnectionAsync(clientSocket), cancellationToken).Forget(_logger); + } + catch (OperationCanceledException) + { } catch (Exception exception) { - _logger.Error(exception, $"Error while accepting connection at TCP listener {_socket.LocalEndPoint} TLS={_tlsCertificate != null}."); + if (exception is SocketException socketException) + { + if (socketException.SocketErrorCode == SocketError.ConnectionAborted || + socketException.SocketErrorCode == SocketError.OperationAborted) + { + continue; + } + } + + _logger.Error(exception, $"Error while accepting connection at TCP listener {_localEndPoint} TLS={_tlsCertificate != null}."); await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); } } @@ -105,7 +138,7 @@ namespace MQTTnet.Implementations _logger.Verbose("Client '{0}' accepted by TCP listener '{1}, {2}'.", remoteEndPoint, - _socket.LocalEndPoint, + _localEndPoint, _addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6"); clientSocket.NoDelay = _options.NoDelay; @@ -163,7 +196,7 @@ namespace MQTTnet.Implementations _logger.Verbose("Client '{0}' disconnected at TCP listener '{1}, {2}'.", remoteEndPoint, - _socket.LocalEndPoint, + _localEndPoint, _addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6"); } catch (Exception disposeException)