diff --git a/.gitignore b/.gitignore index 7c38d7d..1e47707 100644 --- a/.gitignore +++ b/.gitignore @@ -293,3 +293,4 @@ Build/nuget.exe *.map *codeSigningKey.pfx +/Tests/MQTTnet.TestApp.NetCore/RetainedMessages.json diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index ab982c0..096c2b1 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -16,6 +16,8 @@ * [Core] Fixed a logging issue when dealing with IOExceptions. * [Core] Fixed a typo in the global logger class (BREAKING CHANGE! Please find new example in Wiki). * [Client] Fixed an issue in _ManagedClient_ which can cause the client to stop when publishing subscriptions. +* [Client] Fixed an issue in _ManagedClient_ which prevents changing the QoS of an existing subscription (BREAKING CHANGE!). +* [Client] Fixed an issue in _MqttClientOptionsBuilder_ which prevents adding TLS options to the client options when calling _Build()_. (Thanks to @cvellan). * [Server] The application message interceptor can now delete any received application message. * [Server] Added a ConnectionValidator context to align with other APIs (BREAKING CHANGE! Please find new example in Wiki). * [Server] Added an interface for the _MqttServerOptions_. diff --git a/Frameworks/MQTTnet.NetStandard/Client/IMqttClient.cs b/Frameworks/MQTTnet.NetStandard/Client/IMqttClient.cs index 0ac2f49..42ca0a4 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/IMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/IMqttClient.cs @@ -15,6 +15,6 @@ namespace MQTTnet.Client Task DisconnectAsync(); Task> SubscribeAsync(IEnumerable topicFilters); - Task UnsubscribeAsync(IEnumerable topicFilters); + Task UnsubscribeAsync(IEnumerable topics); } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs index b5592e0..9625808 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs @@ -12,7 +12,7 @@ using MQTTnet.Protocol; namespace MQTTnet.Client { - public class MqttClient : IMqttClient + public class MqttClient : IMqttClient, IDisposable { private readonly IMqttClientAdapterFactory _adapterFactory; private readonly MqttPacketDispatcher _packetDispatcher; @@ -180,6 +180,11 @@ namespace MQTTnet.Client } } + public void Dispose() + { + _cancellationTokenSource?.Dispose(); + } + private async Task AuthenticateAsync(MqttApplicationMessage willApplicationMessage) { var connectPacket = new MqttConnectPacket diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptionsBuilder.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptionsBuilder.cs index 079e635..ad766c3 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptionsBuilder.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClientOptionsBuilder.cs @@ -119,11 +119,11 @@ namespace MQTTnet.Client if (_tcpOptions != null) { - _options.ChannelOptions = _tcpOptions; + _tcpOptions.TlsOptions = _tlsOptions; } - else + else if (_webSocketOptions != null) { - _options.ChannelOptions = _webSocketOptions; + _webSocketOptions.TlsOptions = _tlsOptions; } } diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs index 41c0ce3..2475959 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs @@ -16,6 +16,6 @@ namespace MQTTnet.ManagedClient Task StopAsync(); Task SubscribeAsync(IEnumerable topicFilters); - Task UnsubscribeAsync(IEnumerable topicFilters); + Task UnsubscribeAsync(IEnumerable topics); } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs index 9b01156..01fa4cd 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs @@ -11,10 +11,10 @@ using MQTTnet.Protocol; namespace MQTTnet.ManagedClient { - public class ManagedMqttClient : IManagedMqttClient + public class ManagedMqttClient : IManagedMqttClient, IDisposable { private readonly BlockingCollection _messageQueue = new BlockingCollection(); - private readonly HashSet _subscriptions = new HashSet(); + private readonly Dictionary _subscriptions = new Dictionary(); private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1); private readonly IMqttClient _mqttClient; @@ -75,11 +75,8 @@ namespace MQTTnet.ManagedClient public Task StopAsync() { - _connectionCancellationToken?.Cancel(false); - _connectionCancellationToken = null; - - _publishingCancellationToken?.Cancel(false); - _publishingCancellationToken = null; + StopPublishing(); + StopMaintainingConnection(); while (_messageQueue.Any()) { @@ -88,7 +85,7 @@ namespace MQTTnet.ManagedClient return Task.FromResult(0); } - + public async Task PublishAsync(IEnumerable applicationMessages) { if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); @@ -113,10 +110,8 @@ namespace MQTTnet.ManagedClient { foreach (var topicFilter in topicFilters) { - if (_subscriptions.Add(topicFilter)) - { - _subscriptionsNotPushed = true; - } + _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; + _subscriptionsNotPushed = true; } } finally @@ -125,14 +120,14 @@ namespace MQTTnet.ManagedClient } } - public async Task UnsubscribeAsync(IEnumerable topicFilters) + public async Task UnsubscribeAsync(IEnumerable topics) { await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false); try { - foreach (var topicFilter in topicFilters) + foreach (var topic in topics) { - if (_subscriptions.Remove(topicFilter)) + if (_subscriptions.Remove(topic)) { _subscriptionsNotPushed = true; } @@ -144,6 +139,14 @@ namespace MQTTnet.ManagedClient } } + public void Dispose() + { + _messageQueue?.Dispose(); + _subscriptionsSemaphore?.Dispose(); + _connectionCancellationToken?.Dispose(); + _publishingCancellationToken?.Dispose(); + } + private async Task MaintainConnectionAsync(CancellationToken cancellationToken) { try @@ -174,9 +177,7 @@ namespace MQTTnet.ManagedClient var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false); if (connectionState == ReconnectionResult.NotConnected) { - _publishingCancellationToken?.Cancel(false); - _publishingCancellationToken = null; - + StopPublishing(); await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false); return; } @@ -185,18 +186,15 @@ namespace MQTTnet.ManagedClient { await PushSubscriptionsAsync().ConfigureAwait(false); - _publishingCancellationToken = new CancellationTokenSource(); - -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(async () => await PublishQueuedMessagesAsync(_publishingCancellationToken.Token).ConfigureAwait(false), _publishingCancellationToken.Token).ConfigureAwait(false); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + StartPublishing(); + return; } if (connectionState == ReconnectionResult.StillConnected) { - await Task.Delay(TimeSpan.FromSeconds(1), _connectionCancellationToken.Token).ConfigureAwait(false); + await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); } } catch (OperationCanceledException) @@ -211,7 +209,7 @@ namespace MQTTnet.ManagedClient _logger.Error(exception, "Unhandled exception while maintaining connection."); } } - + private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) { try @@ -279,7 +277,7 @@ namespace MQTTnet.ManagedClient await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false); try { - subscriptions = _subscriptions.ToList(); + subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList(); _subscriptionsNotPushed = false; } finally @@ -335,5 +333,35 @@ namespace MQTTnet.ManagedClient { Connected?.Invoke(this, eventArgs); } + + private void StartPublishing() + { + if (_publishingCancellationToken != null) + { + StopPublishing(); + } + + var cts = new CancellationTokenSource(); + + _publishingCancellationToken = cts; + +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run(async () => await PublishQueuedMessagesAsync(cts.Token).ConfigureAwait(false), cts.Token).ConfigureAwait(false); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + + private void StopPublishing() + { + _publishingCancellationToken?.Cancel(false); + _publishingCancellationToken?.Dispose(); + _publishingCancellationToken = null; + } + + private void StopMaintainingConnection() + { + _connectionCancellationToken?.Cancel(false); + _connectionCancellationToken?.Dispose(); + _connectionCancellationToken = null; + } } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs index a321d93..9615f86 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs @@ -232,8 +232,8 @@ namespace MQTTnet.Server private async Task HandleIncomingSubscribePacketAsync(IMqttChannelAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken) { var subscribeResult = await _subscriptionsManager.SubscribeAsync(subscribePacket, ClientId); - await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket).ConfigureAwait(false); + if (subscribeResult.CloseConnection) { await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttDisconnectPacket()).ConfigureAwait(false); diff --git a/Frameworks/MQTTnet.NetStandard/TopicFilter.cs b/Frameworks/MQTTnet.NetStandard/TopicFilter.cs index da126d6..1dad73d 100644 --- a/Frameworks/MQTTnet.NetStandard/TopicFilter.cs +++ b/Frameworks/MQTTnet.NetStandard/TopicFilter.cs @@ -10,14 +10,9 @@ namespace MQTTnet QualityOfServiceLevel = qualityOfServiceLevel; } - public string Topic { get; } + public string Topic { get; set; } - public MqttQualityOfServiceLevel QualityOfServiceLevel { get; } - - public override int GetHashCode() - { - return QualityOfServiceLevel.GetHashCode() ^ (Topic ?? string.Empty).GetHashCode(); - } + public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } public override string ToString() { diff --git a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs index 21d1fdf..e23e994 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs @@ -45,7 +45,8 @@ namespace MQTTnet.TestApp.NetCore await managedClient.StartAsync(options); await managedClient.SubscribeAsync(new TopicFilter("xyz", MqttQualityOfServiceLevel.AtMostOnce)); - + await managedClient.SubscribeAsync(new TopicFilter("abc", MqttQualityOfServiceLevel.AtMostOnce)); + await managedClient.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("3").Build()); Console.WriteLine("Managed client started.");