@@ -2,7 +2,7 @@ | |||||
<package > | <package > | ||||
<metadata> | <metadata> | ||||
<id>MQTTnet</id> | <id>MQTTnet</id> | ||||
<version>2.7.0</version> | |||||
<version>2.7.1</version> | |||||
<authors>Christian Kratky</authors> | <authors>Christian Kratky</authors> | ||||
<owners>Christian Kratky</owners> | <owners>Christian Kratky</owners> | ||||
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl> | <licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl> | ||||
@@ -10,16 +10,10 @@ | |||||
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl> | <iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl> | ||||
<requireLicenseAcceptance>false</requireLicenseAcceptance> | <requireLicenseAcceptance>false</requireLicenseAcceptance> | ||||
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> | <description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> | ||||
<releaseNotes>* [Core] Fixed some still thread blocking parts in the code (thanks to @kpreisser). | |||||
* [Core] Updated 3rd-Party packages. | |||||
* [Core] Fixed wrong packet identifier calculation (thanks to @benpittoors). | |||||
* [Core] Fixed an issue when reading the body of a package from a disconnected sender (thanks to @kpreisser). | |||||
* [Core] Fixed wrong parsing of the body length (thanks to @kpreisser). | |||||
* [Client] The client interfaces are now implementing _IDisposable_. | |||||
* [Client] The disconnected event now contains the exception which was thrown and causing the disconnect. | |||||
* [Server] Fixed an issue which lets the server block 1 second after accepting a new connection (thanks to @kpreisser). | |||||
* [Server] The server now allows managing client subscriptions. | |||||
* [Server] Added events for topic subscriptions. | |||||
<releaseNotes>* [Client] The _ManagedClient_ now has an event which is fired after a queued application message was processed (including exception). | |||||
* [Client] The _ManagedClient_ now supports unsubscribing (thanks to @lerppana) | |||||
* [Server] Fixed some minor async issues. | |||||
* [Server] Fixed wrong comparison of the topic and QoS for retained messages. | |||||
</releaseNotes> | </releaseNotes> | ||||
<copyright>Copyright Christian Kratky 2016-2018</copyright> | <copyright>Copyright Christian Kratky 2016-2018</copyright> | ||||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | ||||
@@ -1,6 +1,7 @@ | |||||
param([string]$assemblyVersion, [string]$nugetVersion) | param([string]$assemblyVersion, [string]$nugetVersion) | ||||
if ([string]::IsNullOrEmpty($version)) {$version = "0.0.1"} | |||||
if ([string]::IsNullOrEmpty($assemblyVersion)) {$assemblyVersion = "0.0.1"} | |||||
if ([string]::IsNullOrEmpty($nugetVersion)) {$nugetVersion = "0.0.1"} | |||||
$vswhere = ${Env:\ProgramFiles(x86)} + '\Microsoft Visual Studio\Installer\vswhere' | $vswhere = ${Env:\ProgramFiles(x86)} + '\Microsoft Visual Studio\Installer\vswhere' | ||||
@@ -9,14 +10,14 @@ if ($path) { | |||||
$msbuild = join-path $path 'MSBuild\15.0\Bin\MSBuild.exe' | $msbuild = join-path $path 'MSBuild\15.0\Bin\MSBuild.exe' | ||||
# Build the core library | # Build the core library | ||||
&$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | |||||
&$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | |||||
&$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | |||||
&$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | |||||
&$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | |||||
&$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||||
&$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||||
&$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||||
&$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||||
&$msbuild ..\Frameworks\MQTTnet.Netstandard\MQTTnet.Netstandard.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||||
# Build the ASP.NET Core 2.0 extension | # Build the ASP.NET Core 2.0 extension | ||||
&$msbuild ..\Frameworks\MQTTnet.AspNetCore\MQTTnet.AspNetCore.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | |||||
&$msbuild ..\Frameworks\MQTTnet.AspNetCore\MQTTnet.AspNetCore.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||||
# Build the RPC extension | # Build the RPC extension | ||||
&$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | ||||
@@ -25,7 +26,8 @@ if ($path) { | |||||
&$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | ||||
&$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | ||||
Remove-Item .\NuGet -Force -Recurse | |||||
Remove-Item .\NuGet -Force -Recurse -ErrorAction SilentlyContinue | |||||
New-Item -ItemType Directory -Force -Path .\NuGet | New-Item -ItemType Directory -Force -Path .\NuGet | ||||
.\NuGet.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion | .\NuGet.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion | ||||
.\NuGet.exe pack MQTTnet.AspNetCore.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion | .\NuGet.exe pack MQTTnet.AspNetCore.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion | ||||
@@ -3,4 +3,7 @@ In order to import the key for code signing on a new developer machine use the f | |||||
> sn –i codeSigningKey.pfx VS_KEY_EFCA4C5B6DFD4B4F | > sn –i codeSigningKey.pfx VS_KEY_EFCA4C5B6DFD4B4F | ||||
The container name may be different. | |||||
The container name may be different. | |||||
# Check if the assembly has a strong name | |||||
> sn -vf MQTTnet.dll |
@@ -20,6 +20,7 @@ | |||||
<TargetFrameworkIdentifier>.NETCore</TargetFrameworkIdentifier> | <TargetFrameworkIdentifier>.NETCore</TargetFrameworkIdentifier> | ||||
<TargetFrameworkVersion>v5.0</TargetFrameworkVersion> | <TargetFrameworkVersion>v5.0</TargetFrameworkVersion> | ||||
<DefineConstants>$(DefineConstants);WINDOWS_UWP</DefineConstants> | <DefineConstants>$(DefineConstants);WINDOWS_UWP</DefineConstants> | ||||
<DefaultLanguage>en</DefaultLanguage> | |||||
<LanguageTargets>$(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets</LanguageTargets> | <LanguageTargets>$(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets</LanguageTargets> | ||||
</PropertyGroup> | </PropertyGroup> | ||||
@@ -15,8 +15,7 @@ | |||||
<Description /> | <Description /> | ||||
<Authors /> | <Authors /> | ||||
<PackageId /> | <PackageId /> | ||||
<SignAssembly>true</SignAssembly> | |||||
<AssemblyOriginatorKeyFile>codeSigningKey.pfx</AssemblyOriginatorKeyFile> | |||||
<SignAssembly>false</SignAssembly> | |||||
<DelaySign>false</DelaySign> | <DelaySign>false</DelaySign> | ||||
</PropertyGroup> | </PropertyGroup> | ||||
@@ -29,6 +28,7 @@ | |||||
<TargetFrameworkIdentifier>.NETCore</TargetFrameworkIdentifier> | <TargetFrameworkIdentifier>.NETCore</TargetFrameworkIdentifier> | ||||
<TargetFrameworkVersion>v5.0</TargetFrameworkVersion> | <TargetFrameworkVersion>v5.0</TargetFrameworkVersion> | ||||
<DefineConstants>$(DefineConstants);WINDOWS_UWP</DefineConstants> | <DefineConstants>$(DefineConstants);WINDOWS_UWP</DefineConstants> | ||||
<DefaultLanguage>en</DefaultLanguage> | |||||
<LanguageTargets>$(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets</LanguageTargets> | <LanguageTargets>$(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets</LanguageTargets> | ||||
</PropertyGroup> | </PropertyGroup> | ||||
@@ -0,0 +1,19 @@ | |||||
using System; | |||||
namespace MQTTnet.ManagedClient | |||||
{ | |||||
public class ApplicationMessageProcessedEventArgs : EventArgs | |||||
{ | |||||
public ApplicationMessageProcessedEventArgs(MqttApplicationMessage applicationMessage, Exception exception) | |||||
{ | |||||
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); | |||||
Exception = exception; | |||||
} | |||||
public MqttApplicationMessage ApplicationMessage { get; } | |||||
public Exception Exception { get; } | |||||
public bool HasFailed => Exception != null; | |||||
public bool HasSucceeded => Exception == null; | |||||
} | |||||
} |
@@ -12,6 +12,8 @@ namespace MQTTnet.ManagedClient | |||||
event EventHandler<MqttClientConnectedEventArgs> Connected; | event EventHandler<MqttClientConnectedEventArgs> Connected; | ||||
event EventHandler<MqttClientDisconnectedEventArgs> Disconnected; | event EventHandler<MqttClientDisconnectedEventArgs> Disconnected; | ||||
event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed; | |||||
Task StartAsync(IManagedMqttClientOptions options); | Task StartAsync(IManagedMqttClientOptions options); | ||||
Task StopAsync(); | Task StopAsync(); | ||||
@@ -16,6 +16,7 @@ namespace MQTTnet.ManagedClient | |||||
private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>(); | private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>(); | ||||
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>(); | private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>(); | ||||
private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1); | private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1); | ||||
private readonly List<string> _unsubscriptions = new List<string>(); | |||||
private readonly IMqttClient _mqttClient; | private readonly IMqttClient _mqttClient; | ||||
private readonly IMqttNetLogger _logger; | private readonly IMqttNetLogger _logger; | ||||
@@ -27,7 +28,7 @@ namespace MQTTnet.ManagedClient | |||||
private IManagedMqttClientOptions _options; | private IManagedMqttClientOptions _options; | ||||
private bool _subscriptionsNotPushed; | private bool _subscriptionsNotPushed; | ||||
public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger) | public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger) | ||||
{ | { | ||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | _logger = logger ?? throw new ArgumentNullException(nameof(logger)); | ||||
@@ -43,6 +44,7 @@ namespace MQTTnet.ManagedClient | |||||
public event EventHandler<MqttClientConnectedEventArgs> Connected; | public event EventHandler<MqttClientConnectedEventArgs> Connected; | ||||
public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected; | public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected; | ||||
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; | public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; | ||||
public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed; | |||||
public async Task StartAsync(IManagedMqttClientOptions options) | public async Task StartAsync(IManagedMqttClientOptions options) | ||||
{ | { | ||||
@@ -57,7 +59,7 @@ namespace MQTTnet.ManagedClient | |||||
if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started."); | if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started."); | ||||
_options = options; | _options = options; | ||||
if (_options.Storage != null) | if (_options.Storage != null) | ||||
{ | { | ||||
_storageManager = new ManagedMqttClientStorageManager(_options.Storage); | _storageManager = new ManagedMqttClientStorageManager(_options.Storage); | ||||
@@ -65,9 +67,9 @@ namespace MQTTnet.ManagedClient | |||||
} | } | ||||
_connectionCancellationToken = new CancellationTokenSource(); | _connectionCancellationToken = new CancellationTokenSource(); | ||||
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | ||||
Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token).ConfigureAwait(false); | |||||
Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token).ConfigureAwait(false), _connectionCancellationToken.Token).ConfigureAwait(false); | |||||
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | ||||
_logger.Info<ManagedMqttClient>("Started"); | _logger.Info<ManagedMqttClient>("Started"); | ||||
@@ -85,7 +87,7 @@ namespace MQTTnet.ManagedClient | |||||
return Task.FromResult(0); | return Task.FromResult(0); | ||||
} | } | ||||
public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages) | public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages) | ||||
{ | { | ||||
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); | if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); | ||||
@@ -96,7 +98,7 @@ namespace MQTTnet.ManagedClient | |||||
{ | { | ||||
await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); | await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); | ||||
} | } | ||||
_messageQueue.Add(applicationMessage); | _messageQueue.Add(applicationMessage); | ||||
} | } | ||||
} | } | ||||
@@ -129,6 +131,7 @@ namespace MQTTnet.ManagedClient | |||||
{ | { | ||||
if (_subscriptions.Remove(topic)) | if (_subscriptions.Remove(topic)) | ||||
{ | { | ||||
_unsubscriptions.Add(topic); | |||||
_subscriptionsNotPushed = true; | _subscriptionsNotPushed = true; | ||||
} | } | ||||
} | } | ||||
@@ -153,7 +156,7 @@ namespace MQTTnet.ManagedClient | |||||
{ | { | ||||
while (!cancellationToken.IsCancellationRequested) | while (!cancellationToken.IsCancellationRequested) | ||||
{ | { | ||||
await TryMaintainConnectionAsync(cancellationToken); | |||||
await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false); | |||||
} | } | ||||
} | } | ||||
catch (OperationCanceledException) | catch (OperationCanceledException) | ||||
@@ -184,11 +187,11 @@ namespace MQTTnet.ManagedClient | |||||
if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed) | if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed) | ||||
{ | { | ||||
await PushSubscriptionsAsync().ConfigureAwait(false); | |||||
await SynchronizeSubscriptionsAsync().ConfigureAwait(false); | |||||
StartPublishing(); | StartPublishing(); | ||||
return; | return; | ||||
} | } | ||||
@@ -209,7 +212,7 @@ namespace MQTTnet.ManagedClient | |||||
_logger.Error<ManagedMqttClient>(exception, "Unhandled exception while maintaining connection."); | _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while maintaining connection."); | ||||
} | } | ||||
} | } | ||||
private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) | private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) | ||||
{ | { | ||||
try | try | ||||
@@ -227,7 +230,7 @@ namespace MQTTnet.ManagedClient | |||||
continue; | continue; | ||||
} | } | ||||
await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); | |||||
await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); | |||||
} | } | ||||
} | } | ||||
catch (OperationCanceledException) | catch (OperationCanceledException) | ||||
@@ -245,6 +248,7 @@ namespace MQTTnet.ManagedClient | |||||
private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message) | private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message) | ||||
{ | { | ||||
Exception transmitException = null; | |||||
try | try | ||||
{ | { | ||||
await _mqttClient.PublishAsync(message).ConfigureAwait(false); | await _mqttClient.PublishAsync(message).ConfigureAwait(false); | ||||
@@ -256,6 +260,8 @@ namespace MQTTnet.ManagedClient | |||||
} | } | ||||
catch (MqttCommunicationException exception) | catch (MqttCommunicationException exception) | ||||
{ | { | ||||
transmitException = exception; | |||||
_logger.Warning<ManagedMqttClient>(exception, "Publishing application message failed."); | _logger.Warning<ManagedMqttClient>(exception, "Publishing application message failed."); | ||||
if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) | if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) | ||||
@@ -265,27 +271,38 @@ namespace MQTTnet.ManagedClient | |||||
} | } | ||||
catch (Exception exception) | catch (Exception exception) | ||||
{ | { | ||||
transmitException = exception; | |||||
_logger.Error<ManagedMqttClient>(exception, "Unhandled exception while publishing queued application message."); | _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while publishing queued application message."); | ||||
} | } | ||||
finally | |||||
{ | |||||
ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException)); | |||||
} | |||||
} | } | ||||
private async Task PushSubscriptionsAsync() | |||||
private async Task SynchronizeSubscriptionsAsync() | |||||
{ | { | ||||
_logger.Info<ManagedMqttClient>(nameof(ManagedMqttClient), "Synchronizing subscriptions"); | _logger.Info<ManagedMqttClient>(nameof(ManagedMqttClient), "Synchronizing subscriptions"); | ||||
List<TopicFilter> subscriptions; | List<TopicFilter> subscriptions; | ||||
List<string> unsubscriptions; | |||||
await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false); | await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false); | ||||
try | try | ||||
{ | { | ||||
subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList(); | subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList(); | ||||
unsubscriptions = new List<string>(_unsubscriptions); | |||||
_unsubscriptions.Clear(); | |||||
_subscriptionsNotPushed = false; | _subscriptionsNotPushed = false; | ||||
} | } | ||||
finally | finally | ||||
{ | { | ||||
_subscriptionsSemaphore.Release(); | _subscriptionsSemaphore.Release(); | ||||
} | } | ||||
if (!subscriptions.Any()) | |||||
if (!subscriptions.Any() && !unsubscriptions.Any()) | |||||
{ | { | ||||
return; | return; | ||||
} | } | ||||
@@ -293,6 +310,7 @@ namespace MQTTnet.ManagedClient | |||||
try | try | ||||
{ | { | ||||
await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false); | await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false); | ||||
await _mqttClient.UnsubscribeAsync(unsubscriptions).ConfigureAwait(false); | |||||
} | } | ||||
catch (Exception exception) | catch (Exception exception) | ||||
{ | { | ||||
@@ -145,16 +145,16 @@ namespace MQTTnet.Server | |||||
PendingMessagesQueue.Enqueue(publishPacket); | PendingMessagesQueue.Enqueue(publishPacket); | ||||
} | } | ||||
public Task SubscribeAsync(IList<TopicFilter> topicFilters) | |||||
public async Task SubscribeAsync(IList<TopicFilter> topicFilters) | |||||
{ | { | ||||
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); | if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); | ||||
var response = SubscriptionsManager.SubscribeAsync(new MqttSubscribePacket | |||||
await SubscriptionsManager.SubscribeAsync(new MqttSubscribePacket | |||||
{ | { | ||||
TopicFilters = topicFilters | TopicFilters = topicFilters | ||||
}); | |||||
}).ConfigureAwait(false); | |||||
return response; | |||||
await EnqueueSubscribedRetainedMessagesAsync(topicFilters).ConfigureAwait(false); | |||||
} | } | ||||
public Task UnsubscribeAsync(IList<string> topicFilters) | public Task UnsubscribeAsync(IList<string> topicFilters) | ||||
@@ -270,7 +270,7 @@ namespace MQTTnet.Server | |||||
await StopAsync().ConfigureAwait(false); | await StopAsync().ConfigureAwait(false); | ||||
} | } | ||||
await EnqueueSubscribedRetainedMessagesAsync(subscribePacket).ConfigureAwait(false); | |||||
await EnqueueSubscribedRetainedMessagesAsync(subscribePacket.TopicFilters).ConfigureAwait(false); | |||||
} | } | ||||
private async Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken) | private async Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken) | ||||
@@ -279,12 +279,12 @@ namespace MQTTnet.Server | |||||
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, unsubscribeResult); | await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, unsubscribeResult); | ||||
} | } | ||||
private async Task EnqueueSubscribedRetainedMessagesAsync(MqttSubscribePacket subscribePacket) | |||||
private async Task EnqueueSubscribedRetainedMessagesAsync(ICollection<TopicFilter> topicFilters) | |||||
{ | { | ||||
var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(subscribePacket); | |||||
foreach (var publishPacket in retainedMessages) | |||||
var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters); | |||||
foreach (var applicationMessage in retainedMessages) | |||||
{ | { | ||||
await EnqueueApplicationMessageAsync(publishPacket); | |||||
await EnqueueApplicationMessageAsync(applicationMessage).ConfigureAwait(false); | |||||
} | } | ||||
} | } | ||||
@@ -220,7 +220,8 @@ namespace MQTTnet.Server | |||||
private MqttApplicationMessage InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage) | private MqttApplicationMessage InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage) | ||||
{ | { | ||||
if (_options.ApplicationMessageInterceptor == null) | |||||
var interceptor = _options.ApplicationMessageInterceptor; | |||||
if (interceptor == null) | |||||
{ | { | ||||
return applicationMessage; | return applicationMessage; | ||||
} | } | ||||
@@ -229,7 +230,7 @@ namespace MQTTnet.Server | |||||
senderClientSession.ClientId, | senderClientSession.ClientId, | ||||
applicationMessage); | applicationMessage); | ||||
_options.ApplicationMessageInterceptor(interceptorContext); | |||||
interceptor(interceptorContext); | |||||
return interceptorContext.ApplicationMessage; | return interceptorContext.ApplicationMessage; | ||||
} | } | ||||
@@ -4,7 +4,6 @@ using System.Linq; | |||||
using System.Threading; | using System.Threading; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using MQTTnet.Diagnostics; | using MQTTnet.Diagnostics; | ||||
using MQTTnet.Packets; | |||||
namespace MQTTnet.Server | namespace MQTTnet.Server | ||||
{ | { | ||||
@@ -68,7 +67,7 @@ namespace MQTTnet.Server | |||||
} | } | ||||
} | } | ||||
public async Task<List<MqttApplicationMessage>> GetSubscribedMessagesAsync(MqttSubscribePacket subscribePacket) | |||||
public async Task<List<MqttApplicationMessage>> GetSubscribedMessagesAsync(ICollection<TopicFilter> topicFilters) | |||||
{ | { | ||||
var retainedMessages = new List<MqttApplicationMessage>(); | var retainedMessages = new List<MqttApplicationMessage>(); | ||||
@@ -77,13 +76,8 @@ namespace MQTTnet.Server | |||||
{ | { | ||||
foreach (var retainedMessage in _retainedMessages.Values) | foreach (var retainedMessage in _retainedMessages.Values) | ||||
{ | { | ||||
foreach (var topicFilter in subscribePacket.TopicFilters) | |||||
foreach (var topicFilter in topicFilters) | |||||
{ | { | ||||
if (retainedMessage.QualityOfServiceLevel < topicFilter.QualityOfServiceLevel) | |||||
{ | |||||
continue; | |||||
} | |||||
if (!MqttTopicFilterComparer.IsMatch(retainedMessage.Topic, topicFilter.Topic)) | if (!MqttTopicFilterComparer.IsMatch(retainedMessage.Topic, topicFilter.Topic)) | ||||
{ | { | ||||
continue; | continue; | ||||
@@ -15,8 +15,7 @@ | |||||
<Description /> | <Description /> | ||||
<Authors /> | <Authors /> | ||||
<PackageId /> | <PackageId /> | ||||
<SignAssembly>true</SignAssembly> | |||||
<AssemblyOriginatorKeyFile>codeSigningKey.pfx</AssemblyOriginatorKeyFile> | |||||
<SignAssembly>false</SignAssembly> | |||||
<DelaySign>false</DelaySign> | <DelaySign>false</DelaySign> | ||||
</PropertyGroup> | </PropertyGroup> | ||||
@@ -29,6 +28,7 @@ | |||||
<TargetFrameworkIdentifier>.NETCore</TargetFrameworkIdentifier> | <TargetFrameworkIdentifier>.NETCore</TargetFrameworkIdentifier> | ||||
<TargetFrameworkVersion>v5.0</TargetFrameworkVersion> | <TargetFrameworkVersion>v5.0</TargetFrameworkVersion> | ||||
<DefineConstants>$(DefineConstants);WINDOWS_UWP</DefineConstants> | <DefineConstants>$(DefineConstants);WINDOWS_UWP</DefineConstants> | ||||
<DefaultLanguage>en</DefaultLanguage> | |||||
<LanguageTargets>$(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets</LanguageTargets> | <LanguageTargets>$(MSBuildExtensionsPath)\Microsoft\WindowsXaml\v$(VisualStudioVersion)\Microsoft.Windows.UI.Xaml.CSharp.targets</LanguageTargets> | ||||
</PropertyGroup> | </PropertyGroup> | ||||