diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 7823d38..fe60e58 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -2,7 +2,7 @@ MQTTnet - 2.7.0 + 2.7.1 Christian Kratky Christian Kratky https://github.com/chkr1011/MQTTnet/blob/master/LICENSE @@ -10,16 +10,10 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * [Core] Fixed some still thread blocking parts in the code (thanks to @kpreisser). -* [Core] Updated 3rd-Party packages. -* [Core] Fixed wrong packet identifier calculation (thanks to @benpittoors). -* [Core] Fixed an issue when reading the body of a package from a disconnected sender (thanks to @kpreisser). -* [Core] Fixed wrong parsing of the body length (thanks to @kpreisser). -* [Client] The client interfaces are now implementing _IDisposable_. -* [Client] The disconnected event now contains the exception which was thrown and causing the disconnect. -* [Server] Fixed an issue which lets the server block 1 second after accepting a new connection (thanks to @kpreisser). -* [Server] The server now allows managing client subscriptions. -* [Server] Added events for topic subscriptions. + * [Client] The _ManagedClient_ now has an event which is fired after a queued application message was processed (including exception). +* [Client] The _ManagedClient_ now supports unsubscribing (thanks to @lerppana) +* [Server] Fixed some minor async issues. +* [Server] Fixed wrong comparison of the topic and QoS for retained messages. Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Build/build.ps1 b/Build/build.ps1 index 2b4ee5f..021ca1d 100644 --- a/Build/build.ps1 +++ b/Build/build.ps1 @@ -1,6 +1,7 @@ param([string]$assemblyVersion, [string]$nugetVersion) -if ([string]::IsNullOrEmpty($version)) {$version = "0.0.1"} +if ([string]::IsNullOrEmpty($assemblyVersion)) {$assemblyVersion = "0.0.1"} +if ([string]::IsNullOrEmpty($nugetVersion)) {$nugetVersion = "0.0.1"} $vswhere = ${Env:\ProgramFiles(x86)} + '\Microsoft Visual Studio\Installer\vswhere' @@ -9,14 +10,14 @@ if ($path) { $msbuild = join-path $path 'MSBuild\15.0\Bin\MSBuild.exe' # Build the core library - &$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m - &$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m - &$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m - &$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m - &$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m + &$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" + &$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" + &$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" + &$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" + &$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" # Build the ASP.NET Core 2.0 extension - &$msbuild ..\Frameworks\MQTTnet.AspNetCore\MQTTnet.AspNetCore.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m + &$msbuild ..\Frameworks\MQTTnet.AspNetCore\MQTTnet.AspNetCore.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" # Build the RPC extension &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m @@ -25,7 +26,8 @@ if ($path) { &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m - Remove-Item .\NuGet -Force -Recurse + Remove-Item .\NuGet -Force -Recurse -ErrorAction SilentlyContinue + New-Item -ItemType Directory -Force -Path .\NuGet .\NuGet.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion .\NuGet.exe pack MQTTnet.AspNetCore.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion diff --git a/Build/codeSigningKey.pfx b/Build/codeSigningKey.pfx new file mode 100644 index 0000000..9374a5d Binary files /dev/null and b/Build/codeSigningKey.pfx differ diff --git a/Documents/Import_CodeSigningKey.md b/Documents/Import_CodeSigningKey.md index 66eb41f..7e966e4 100644 --- a/Documents/Import_CodeSigningKey.md +++ b/Documents/Import_CodeSigningKey.md @@ -3,4 +3,7 @@ In order to import the key for code signing on a new developer machine use the f > sn –i codeSigningKey.pfx VS_KEY_EFCA4C5B6DFD4B4F -The container name may be different. \ No newline at end of file +The container name may be different. + +# Check if the assembly has a strong name +> sn -vf MQTTnet.dll \ No newline at end of file diff --git a/Extensions/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj b/Extensions/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj index fd16072..3f4128e 100644 --- a/Extensions/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj +++ b/Extensions/MQTTnet.Extensions.Rpc/MQTTnet.Extensions.Rpc.csproj @@ -20,6 +20,7 @@ .NETCore v5.0 $(DefineConstants);WINDOWS_UWP + en $(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets diff --git a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj index 70415cd..cb154c2 100644 --- a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj +++ b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj @@ -15,8 +15,7 @@ - true - codeSigningKey.pfx + false false @@ -29,6 +28,7 @@ .NETCore v5.0 $(DefineConstants);WINDOWS_UWP + en $(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ApplicationMessageProcessedEventArgs.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ApplicationMessageProcessedEventArgs.cs new file mode 100644 index 0000000..fa82c0a --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ApplicationMessageProcessedEventArgs.cs @@ -0,0 +1,19 @@ +using System; + +namespace MQTTnet.ManagedClient +{ + public class ApplicationMessageProcessedEventArgs : EventArgs + { + public ApplicationMessageProcessedEventArgs(MqttApplicationMessage applicationMessage, Exception exception) + { + ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); + Exception = exception; + } + + public MqttApplicationMessage ApplicationMessage { get; } + public Exception Exception { get; } + + public bool HasFailed => Exception != null; + public bool HasSucceeded => Exception == null; + } +} diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs index 9d67908..6f435d5 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs @@ -12,6 +12,8 @@ namespace MQTTnet.ManagedClient event EventHandler Connected; event EventHandler Disconnected; + event EventHandler ApplicationMessageProcessed; + Task StartAsync(IManagedMqttClientOptions options); Task StopAsync(); diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs index a7f5a1c..5a392c3 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs @@ -16,6 +16,7 @@ namespace MQTTnet.ManagedClient private readonly BlockingCollection _messageQueue = new BlockingCollection(); private readonly Dictionary _subscriptions = new Dictionary(); private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1); + private readonly List _unsubscriptions = new List(); private readonly IMqttClient _mqttClient; private readonly IMqttNetLogger _logger; @@ -27,7 +28,7 @@ namespace MQTTnet.ManagedClient private IManagedMqttClientOptions _options; private bool _subscriptionsNotPushed; - + public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); @@ -43,6 +44,7 @@ namespace MQTTnet.ManagedClient public event EventHandler Connected; public event EventHandler Disconnected; public event EventHandler ApplicationMessageReceived; + public event EventHandler ApplicationMessageProcessed; public async Task StartAsync(IManagedMqttClientOptions options) { @@ -57,7 +59,7 @@ namespace MQTTnet.ManagedClient if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started."); _options = options; - + if (_options.Storage != null) { _storageManager = new ManagedMqttClientStorageManager(_options.Storage); @@ -65,9 +67,9 @@ namespace MQTTnet.ManagedClient } _connectionCancellationToken = new CancellationTokenSource(); - + #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token).ConfigureAwait(false); + Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token).ConfigureAwait(false), _connectionCancellationToken.Token).ConfigureAwait(false); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed _logger.Info("Started"); @@ -85,7 +87,7 @@ namespace MQTTnet.ManagedClient return Task.FromResult(0); } - + public async Task PublishAsync(IEnumerable applicationMessages) { if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); @@ -96,7 +98,7 @@ namespace MQTTnet.ManagedClient { await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); } - + _messageQueue.Add(applicationMessage); } } @@ -129,6 +131,7 @@ namespace MQTTnet.ManagedClient { if (_subscriptions.Remove(topic)) { + _unsubscriptions.Add(topic); _subscriptionsNotPushed = true; } } @@ -153,7 +156,7 @@ namespace MQTTnet.ManagedClient { while (!cancellationToken.IsCancellationRequested) { - await TryMaintainConnectionAsync(cancellationToken); + await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false); } } catch (OperationCanceledException) @@ -184,11 +187,11 @@ namespace MQTTnet.ManagedClient if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed) { - await PushSubscriptionsAsync().ConfigureAwait(false); + await SynchronizeSubscriptionsAsync().ConfigureAwait(false); StartPublishing(); - + return; } @@ -209,7 +212,7 @@ namespace MQTTnet.ManagedClient _logger.Error(exception, "Unhandled exception while maintaining connection."); } } - + private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) { try @@ -227,7 +230,7 @@ namespace MQTTnet.ManagedClient continue; } - await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); + await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); } } catch (OperationCanceledException) @@ -245,6 +248,7 @@ namespace MQTTnet.ManagedClient private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message) { + Exception transmitException = null; try { await _mqttClient.PublishAsync(message).ConfigureAwait(false); @@ -256,6 +260,8 @@ namespace MQTTnet.ManagedClient } catch (MqttCommunicationException exception) { + transmitException = exception; + _logger.Warning(exception, "Publishing application message failed."); if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) @@ -265,27 +271,38 @@ namespace MQTTnet.ManagedClient } catch (Exception exception) { + transmitException = exception; _logger.Error(exception, "Unhandled exception while publishing queued application message."); } + finally + { + ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException)); + } } - private async Task PushSubscriptionsAsync() + private async Task SynchronizeSubscriptionsAsync() { _logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions"); List subscriptions; + List unsubscriptions; + await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false); try { subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList(); + + unsubscriptions = new List(_unsubscriptions); + _unsubscriptions.Clear(); + _subscriptionsNotPushed = false; } finally { _subscriptionsSemaphore.Release(); } - - if (!subscriptions.Any()) + + if (!subscriptions.Any() && !unsubscriptions.Any()) { return; } @@ -293,6 +310,7 @@ namespace MQTTnet.ManagedClient try { await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false); + await _mqttClient.UnsubscribeAsync(unsubscriptions).ConfigureAwait(false); } catch (Exception exception) { diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs index c0e9e0c..91e5b8b 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs @@ -145,16 +145,16 @@ namespace MQTTnet.Server PendingMessagesQueue.Enqueue(publishPacket); } - public Task SubscribeAsync(IList topicFilters) + public async Task SubscribeAsync(IList topicFilters) { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - var response = SubscriptionsManager.SubscribeAsync(new MqttSubscribePacket + await SubscriptionsManager.SubscribeAsync(new MqttSubscribePacket { TopicFilters = topicFilters - }); + }).ConfigureAwait(false); - return response; + await EnqueueSubscribedRetainedMessagesAsync(topicFilters).ConfigureAwait(false); } public Task UnsubscribeAsync(IList topicFilters) @@ -270,7 +270,7 @@ namespace MQTTnet.Server await StopAsync().ConfigureAwait(false); } - await EnqueueSubscribedRetainedMessagesAsync(subscribePacket).ConfigureAwait(false); + await EnqueueSubscribedRetainedMessagesAsync(subscribePacket.TopicFilters).ConfigureAwait(false); } private async Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken) @@ -279,12 +279,12 @@ namespace MQTTnet.Server await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, unsubscribeResult); } - private async Task EnqueueSubscribedRetainedMessagesAsync(MqttSubscribePacket subscribePacket) + private async Task EnqueueSubscribedRetainedMessagesAsync(ICollection topicFilters) { - var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(subscribePacket); - foreach (var publishPacket in retainedMessages) + var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters); + foreach (var applicationMessage in retainedMessages) { - await EnqueueApplicationMessageAsync(publishPacket); + await EnqueueApplicationMessageAsync(applicationMessage).ConfigureAwait(false); } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs index d580dd2..a9e68bc 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs @@ -220,7 +220,8 @@ namespace MQTTnet.Server private MqttApplicationMessage InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage) { - if (_options.ApplicationMessageInterceptor == null) + var interceptor = _options.ApplicationMessageInterceptor; + if (interceptor == null) { return applicationMessage; } @@ -229,7 +230,7 @@ namespace MQTTnet.Server senderClientSession.ClientId, applicationMessage); - _options.ApplicationMessageInterceptor(interceptorContext); + interceptor(interceptorContext); return interceptorContext.ApplicationMessage; } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs index 7b90209..11cac24 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs @@ -4,7 +4,6 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using MQTTnet.Diagnostics; -using MQTTnet.Packets; namespace MQTTnet.Server { @@ -68,7 +67,7 @@ namespace MQTTnet.Server } } - public async Task> GetSubscribedMessagesAsync(MqttSubscribePacket subscribePacket) + public async Task> GetSubscribedMessagesAsync(ICollection topicFilters) { var retainedMessages = new List(); @@ -77,13 +76,8 @@ namespace MQTTnet.Server { foreach (var retainedMessage in _retainedMessages.Values) { - foreach (var topicFilter in subscribePacket.TopicFilters) + foreach (var topicFilter in topicFilters) { - if (retainedMessage.QualityOfServiceLevel < topicFilter.QualityOfServiceLevel) - { - continue; - } - if (!MqttTopicFilterComparer.IsMatch(retainedMessage.Topic, topicFilter.Topic)) { continue; diff --git a/Frameworks/MQTTnet.Netstandard/MQTTnet.NetStandard.csproj b/Frameworks/MQTTnet.Netstandard/MQTTnet.NetStandard.csproj index 70415cd..cb154c2 100644 --- a/Frameworks/MQTTnet.Netstandard/MQTTnet.NetStandard.csproj +++ b/Frameworks/MQTTnet.Netstandard/MQTTnet.NetStandard.csproj @@ -15,8 +15,7 @@ - true - codeSigningKey.pfx + false false @@ -29,6 +28,7 @@ .NETCore v5.0 $(DefineConstants);WINDOWS_UWP + en $(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets