From bc20850fba2fbf13e9d60d748c2dd30ad2278c17 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 19 Jun 2018 20:52:31 +0200 Subject: [PATCH] Refactor serializer locking and thread instances. --- Build/MQTTnet.AspNetCore.nuspec | 6 ++-- Build/MQTTnet.Extensions.ManagedClient.nuspec | 19 +--------- Build/MQTTnet.Extensions.Rpc.nuspec | 19 +--------- Build/MQTTnet.nuspec | 2 +- Build/build.ps1 | 8 ++--- Build/upload.ps1 | 9 +++++ MQTTnet.sln | 2 ++ .../ManagedMqttClient.cs | 35 ++++++++----------- Source/MQTTnet/Adapter/MqttChannelAdapter.cs | 7 ++++ .../MQTTnet/Diagnostics/MqttNetChildLogger.cs | 2 +- .../Implementations/MqttTcpChannel.Uwp.cs | 22 +++++++++--- Source/MQTTnet/MQTTnet.csproj | 5 ++- .../Serializer/MqttPacketSerializer.cs | 35 ++++++------------- Source/MQTTnet/Serializer/MqttPacketWriter.cs | 6 ++-- .../Server/MqttClientSessionsManager.cs | 2 +- .../MainPage.xaml | 31 ++++++++++------ .../MainPage.xaml.cs | 20 ++++++++++- 17 files changed, 120 insertions(+), 110 deletions(-) create mode 100644 Build/upload.ps1 diff --git a/Build/MQTTnet.AspNetCore.nuspec b/Build/MQTTnet.AspNetCore.nuspec index 9f59562..669f205 100644 --- a/Build/MQTTnet.AspNetCore.nuspec +++ b/Build/MQTTnet.AspNetCore.nuspec @@ -13,11 +13,9 @@ * Updated to MQTTnet 2.8.0. Copyright Christian Kratky 2016-2018 - MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin + MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - - - + diff --git a/Build/MQTTnet.Extensions.ManagedClient.nuspec b/Build/MQTTnet.Extensions.ManagedClient.nuspec index 714ed9e..6d7f7f6 100644 --- a/Build/MQTTnet.Extensions.ManagedClient.nuspec +++ b/Build/MQTTnet.Extensions.ManagedClient.nuspec @@ -15,24 +15,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - - - - - - - - - - - - - - - - - - + diff --git a/Build/MQTTnet.Extensions.Rpc.nuspec b/Build/MQTTnet.Extensions.Rpc.nuspec index 16a51c2..d7eddb0 100644 --- a/Build/MQTTnet.Extensions.Rpc.nuspec +++ b/Build/MQTTnet.Extensions.Rpc.nuspec @@ -15,24 +15,7 @@ Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin - - - - - - - - - - - - - - - - - - + diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 5d58437..556957d 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -48,7 +48,7 @@ - + diff --git a/Build/build.ps1 b/Build/build.ps1 index 59f3112..0b6f074 100644 --- a/Build/build.ps1 +++ b/Build/build.ps1 @@ -36,8 +36,8 @@ if ($path) { Remove-Item .\NuGet -Force -Recurse -ErrorAction SilentlyContinue New-Item -ItemType Directory -Force -Path .\NuGet - .\NuGet.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion - .\NuGet.exe pack MQTTnet.AspNetCore.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion - .\NuGet.exe pack MQTTnet.Extensions.Rpc.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion - .\NuGet.exe pack MQTTnet.Extensions.ManagedClient.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion + .\nuget.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion + .\nuget.exe pack MQTTnet.AspNetCore.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion + .\nuget.exe pack MQTTnet.Extensions.Rpc.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion + .\nuget.exe pack MQTTnet.Extensions.ManagedClient.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion } \ No newline at end of file diff --git a/Build/upload.ps1 b/Build/upload.ps1 new file mode 100644 index 0000000..a7cb172 --- /dev/null +++ b/Build/upload.ps1 @@ -0,0 +1,9 @@ +param([string]$apiKey) + +$files = Get-ChildItem -Path ".\NuGet" -Filter "*.nupkg" +foreach ($file in $files) +{ + Write-Host "Uploading: " $file + + .\nuget.exe push $file.Fullname $apiKey -NoSymbols -Source https://api.nuget.org/v3/index.json +} \ No newline at end of file diff --git a/MQTTnet.sln b/MQTTnet.sln index 0e0e60c..a46c295 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -20,6 +20,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{67C28AC1 Build\MQTTnet.Extensions.ManagedClient.nuspec = Build\MQTTnet.Extensions.ManagedClient.nuspec Build\MQTTnet.Extensions.Rpc.nuspec = Build\MQTTnet.Extensions.Rpc.nuspec Build\MQTTnet.nuspec = Build\MQTTnet.nuspec + Build\upload.ps1 = Build\upload.ps1 EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{B3F60ECB-45BA-4C66-8903-8BB89CA67998}" @@ -74,6 +75,7 @@ Global {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|Any CPU {FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|Any CPU.Deploy.0 = Debug|Any CPU {FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|ARM.ActiveCfg = Debug|ARM {FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|ARM.Build.0 = Debug|ARM {FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|ARM.Deploy.0 = Debug|ARM diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 9bd14b2..687b0cd 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -7,7 +7,6 @@ using System.Threading.Tasks; using MQTTnet.Client; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; -using MQTTnet.Internal; using MQTTnet.Protocol; namespace MQTTnet.Extensions.ManagedClient @@ -15,8 +14,7 @@ namespace MQTTnet.Extensions.ManagedClient public class ManagedMqttClient : IManagedMqttClient { private readonly BlockingCollection _messageQueue = new BlockingCollection(); - private readonly Dictionary _subscriptions = new Dictionary(); - private readonly AsyncLock _subscriptionsLock = new AsyncLock(); + private readonly ConcurrentDictionary _subscriptions = new ConcurrentDictionary(); private readonly List _unsubscriptions = new List(); private readonly IMqttClient _mqttClient; @@ -118,39 +116,36 @@ namespace MQTTnet.Extensions.ManagedClient _messageQueue.Add(applicationMessage); } - public async Task SubscribeAsync(IEnumerable topicFilters) + public Task SubscribeAsync(IEnumerable topicFilters) { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) + foreach (var topicFilter in topicFilters) { - foreach (var topicFilter in topicFilters) - { - _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; - _subscriptionsNotPushed = true; - } + _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; + _subscriptionsNotPushed = true; } + + return Task.FromResult(0); } - public async Task UnsubscribeAsync(IEnumerable topics) + public Task UnsubscribeAsync(IEnumerable topics) { - using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) + foreach (var topic in topics) { - foreach (var topic in topics) + if (_subscriptions.TryRemove(topic, out _)) { - if (_subscriptions.Remove(topic)) - { - _unsubscriptions.Add(topic); - _subscriptionsNotPushed = true; - } + _unsubscriptions.Add(topic); + _subscriptionsNotPushed = true; } } + + return Task.FromResult(0); } public void Dispose() { _messageQueue?.Dispose(); - _subscriptionsLock?.Dispose(); _connectionCancellationToken?.Dispose(); _publishingCancellationToken?.Dispose(); } @@ -289,7 +284,7 @@ namespace MQTTnet.Extensions.ManagedClient List subscriptions; List unsubscriptions; - using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) + lock (_subscriptions) { subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList(); diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index 94e0f33..61912ef 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -18,6 +18,8 @@ namespace MQTTnet.Adapter private const uint ErrorOperationAborted = 0x800703E3; private const int ReadBufferSize = 4096; // TODO: Move buffer size to config + private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); + private readonly IMqttNetChildLogger _logger; private readonly IMqttChannel _channel; @@ -88,6 +90,7 @@ namespace MQTTnet.Adapter public async Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) { + await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { _logger.Verbose("TX >>> {0}", packet); @@ -107,6 +110,10 @@ namespace MQTTnet.Adapter WrapException(exception); } + finally + { + _writerSemaphore.Release(); + } } public async Task ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken) diff --git a/Source/MQTTnet/Diagnostics/MqttNetChildLogger.cs b/Source/MQTTnet/Diagnostics/MqttNetChildLogger.cs index 1ae5a9c..3733454 100644 --- a/Source/MQTTnet/Diagnostics/MqttNetChildLogger.cs +++ b/Source/MQTTnet/Diagnostics/MqttNetChildLogger.cs @@ -9,7 +9,7 @@ namespace MQTTnet.Diagnostics public MqttNetChildLogger(IMqttNetLogger logger, string source) { - _logger = logger; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _source = source; } diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs index d7a030a..2ccc7d9 100644 --- a/Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs +++ b/Source/MQTTnet/Implementations/MqttTcpChannel.Uwp.cs @@ -40,7 +40,18 @@ namespace MQTTnet.Implementations public static Func> CustomIgnorableServerCertificateErrorsResolver { get; set; } - public string Endpoint => _socket?.Information?.RemoteAddress?.ToString(); // TODO: Check if contains also the port. + public string Endpoint + { + get + { + if (_socket?.Information != null) + { + return _socket.Information.RemoteAddress + ":" + _socket.Information.RemotePort; + } + + return null; + } + } public async Task ConnectAsync(CancellationToken cancellationToken) { @@ -81,10 +92,13 @@ namespace MQTTnet.Implementations return _readStream.ReadAsync(buffer, offset, count, cancellationToken); } - public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - await _writeStream.WriteAsync(buffer, offset, count, cancellationToken); - await _writeStream.FlushAsync(cancellationToken); + // In the write method only the internal buffer will be filled. So here is no + // async/await required. The real network transmit is done when calling the + // Flush method. + _writeStream.Write(buffer, offset, count); + return _writeStream.FlushAsync(cancellationToken); } public void Dispose() diff --git a/Source/MQTTnet/MQTTnet.csproj b/Source/MQTTnet/MQTTnet.csproj index 4afd09d..8364378 100644 --- a/Source/MQTTnet/MQTTnet.csproj +++ b/Source/MQTTnet/MQTTnet.csproj @@ -1,7 +1,7 @@  - netstandard1.3;netstandard2.0;net452;uap10.0 + netstandard1.3;netstandard2.0;net452;net461;uap10.0 netstandard1.3;netstandard2.0 MQTTnet MQTTnet @@ -62,4 +62,7 @@ + + + \ No newline at end of file diff --git a/Source/MQTTnet/Serializer/MqttPacketSerializer.cs b/Source/MQTTnet/Serializer/MqttPacketSerializer.cs index 159ba89..537513d 100644 --- a/Source/MQTTnet/Serializer/MqttPacketSerializer.cs +++ b/Source/MQTTnet/Serializer/MqttPacketSerializer.cs @@ -11,8 +11,7 @@ namespace MQTTnet.Serializer { private const int FixedHeaderSize = 1; - [ThreadStatic] - private static MqttPacketWriter _packetWriter; + private readonly MqttPacketWriter _packetWriter = new MqttPacketWriter(); public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; @@ -20,14 +19,12 @@ namespace MQTTnet.Serializer { if (packet == null) throw new ArgumentNullException(nameof(packet)); - var packetWriter = InitializePacketWriter(); - // Leave enough head space for max header size (fixed + 4 variable remaining length = 5 bytes) - packetWriter.Reset(); - packetWriter.Seek(5); + _packetWriter.Reset(); + _packetWriter.Seek(5); - var fixedHeader = SerializePacket(packet, packetWriter); - var remainingLength = packetWriter.Length - 5; + var fixedHeader = SerializePacket(packet, _packetWriter); + var remainingLength = _packetWriter.Length - 5; var remainingLengthBuffer = MqttPacketWriter.EncodeRemainingLength(remainingLength); @@ -35,12 +32,12 @@ namespace MQTTnet.Serializer var headerOffset = 5 - headerSize; // Position cursor on correct offset on beginining of array (has leading 0x0) - packetWriter.Seek(headerOffset); - packetWriter.Write(fixedHeader); - packetWriter.Write(remainingLengthBuffer.Array, remainingLengthBuffer.Offset, remainingLengthBuffer.Count); + _packetWriter.Seek(headerOffset); + _packetWriter.Write(fixedHeader); + _packetWriter.Write(remainingLengthBuffer.Array, remainingLengthBuffer.Offset, remainingLengthBuffer.Count); - var buffer = packetWriter.GetBuffer(); - return new ArraySegment(buffer, headerOffset, packetWriter.Length - headerOffset); + var buffer = _packetWriter.GetBuffer(); + return new ArraySegment(buffer, headerOffset, _packetWriter.Length - headerOffset); } public MqttBasePacket Deserialize(ReceivedMqttPacket receivedMqttPacket) @@ -76,7 +73,7 @@ namespace MQTTnet.Serializer public void FreeBuffer() { - InitializePacketWriter().FreeBuffer(); + _packetWriter.FreeBuffer(); } private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter packetWriter) @@ -101,16 +98,6 @@ namespace MQTTnet.Serializer } } - private static MqttPacketWriter InitializePacketWriter() - { - if (_packetWriter == null) - { - _packetWriter = new MqttPacketWriter(); - } - - return _packetWriter; - } - private static MqttBasePacket DeserializeUnsubAck(MqttPacketBodyReader body) { ThrowIfBodyIsEmpty(body); diff --git a/Source/MQTTnet/Serializer/MqttPacketWriter.cs b/Source/MQTTnet/Serializer/MqttPacketWriter.cs index a2c6f5b..ba535bf 100644 --- a/Source/MQTTnet/Serializer/MqttPacketWriter.cs +++ b/Source/MQTTnet/Serializer/MqttPacketWriter.cs @@ -12,6 +12,8 @@ namespace MQTTnet.Serializer /// public class MqttPacketWriter { + public static int MaxBufferSize = 4096; + private byte[] _buffer = new byte[128]; private int _position; @@ -117,12 +119,12 @@ namespace MQTTnet.Serializer // a lot and the size will never reduced. So this method tries to find a size which can be held in // memory for a long time without causing troubles. - if (_buffer.Length < 4096) + if (_buffer.Length < MaxBufferSize) { return; } - Array.Resize(ref _buffer, 4096); + Array.Resize(ref _buffer, MaxBufferSize); } private void EnsureAdditionalCapacity(int additionalCapacity) diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 1cb301a..5937265 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -148,7 +148,7 @@ namespace MQTTnet.Server clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, applicationMessage); } } - catch (TaskCanceledException) + catch (OperationCanceledException) { } catch (Exception exception) diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml index 906d9b8..41617df 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml @@ -4,6 +4,8 @@ xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:d="http://schemas.microsoft.com/expression/blend/2008" xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" + xmlns:server="using:MQTTnet.Server" + xmlns:interop="using:Windows.UI.Xaml.Interop" d:DesignHeight="800" d:DesignWidth="800" mc:Ignorable="d"> @@ -31,7 +33,7 @@ Keep alive interval: - + TCP WS @@ -142,6 +144,7 @@ Persist retained messages in JSON format Clear previously retained messages on startup + Allow persistent sessions @@ -149,11 +152,6 @@ - - - - - @@ -162,12 +160,23 @@ - + - - - - + + + + + + + + + + + + + diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 76f947d..387364a 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.Collections.ObjectModel; using System.Text; using System.Threading.Tasks; using Windows.Security.Cryptography.Certificates; @@ -21,6 +22,7 @@ namespace MQTTnet.TestApp.UniversalWindows public sealed partial class MainPage { private readonly ConcurrentQueue _traceMessages = new ConcurrentQueue(); + private readonly ObservableCollection _sessions = new ObservableCollection(); private IMqttClient _mqttClient; private IMqttServer _mqttServer; @@ -306,6 +308,7 @@ namespace MQTTnet.TestApp.UniversalWindows var options = new MqttServerOptions(); options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text); options.Storage = storage; + options.EnablePersistentSessions = ServerAllowPersistentSessions.IsChecked == true; await _mqttServer.StartAsync(options); } @@ -374,10 +377,25 @@ namespace MQTTnet.TestApp.UniversalWindows private void ClearSessions(object sender, RoutedEventArgs e) { + _sessions.Clear(); } - private void RefreshSessions(object sender, RoutedEventArgs e) + private async void RefreshSessions(object sender, RoutedEventArgs e) { + if (_mqttServer == null) + { + return; + } + + var sessions = await _mqttServer.GetClientSessionsStatusAsync(); + _sessions.Clear(); + + foreach (var session in sessions) + { + _sessions.Add(session); + } + + ListViewSessions.DataContext = _sessions; } private async Task WikiCode()