diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs index b18d678..d068aee 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs @@ -16,6 +16,7 @@ namespace MQTTnet.ManagedClient private readonly BlockingCollection _messageQueue = new BlockingCollection(); private readonly Dictionary _subscriptions = new Dictionary(); private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1); + private readonly List _unsubscriptions = new List(); private readonly IMqttClient _mqttClient; private readonly IMqttNetLogger _logger; @@ -130,6 +131,7 @@ namespace MQTTnet.ManagedClient { if (_subscriptions.Remove(topic)) { + _unsubscriptions.Add(topic); _subscriptionsNotPushed = true; } } @@ -283,16 +285,26 @@ namespace MQTTnet.ManagedClient _logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions"); List subscriptions; + List unsubscriptions; + await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false); try { subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList(); + + unsubscriptions = new List(_unsubscriptions); + _unsubscriptions.Clear(); + _subscriptionsNotPushed = false; } finally { _subscriptionsSemaphore.Release(); } + if (unsubscriptions.Any()) + { + await _mqttClient.UnsubscribeAsync(unsubscriptions); + } if (!subscriptions.Any()) {