diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs index a7f5a1c..98c9984 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs @@ -16,6 +16,7 @@ namespace MQTTnet.ManagedClient private readonly BlockingCollection _messageQueue = new BlockingCollection(); private readonly Dictionary _subscriptions = new Dictionary(); private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1); + private readonly List _unsubscriptions = new List(); private readonly IMqttClient _mqttClient; private readonly IMqttNetLogger _logger; @@ -129,6 +130,7 @@ namespace MQTTnet.ManagedClient { if (_subscriptions.Remove(topic)) { + _unsubscriptions.Add(topic); _subscriptionsNotPushed = true; } } @@ -274,16 +276,26 @@ namespace MQTTnet.ManagedClient _logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions"); List subscriptions; + List unsubscriptions; + await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false); try { subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList(); + + unsubscriptions = new List(_unsubscriptions); + _unsubscriptions.Clear(); + _subscriptionsNotPushed = false; } finally { _subscriptionsSemaphore.Release(); } + if (unsubscriptions.Any()) + { + await _mqttClient.UnsubscribeAsync(unsubscriptions); + } if (!subscriptions.Any()) {