|
|
@@ -16,6 +16,7 @@ namespace MQTTnet.ManagedClient |
|
|
|
private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>(); |
|
|
|
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>(); |
|
|
|
private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1); |
|
|
|
private readonly List<string> _unsubscriptions = new List<string>(); |
|
|
|
|
|
|
|
private readonly IMqttClient _mqttClient; |
|
|
|
private readonly IMqttNetLogger _logger; |
|
|
@@ -129,6 +130,7 @@ namespace MQTTnet.ManagedClient |
|
|
|
{ |
|
|
|
if (_subscriptions.Remove(topic)) |
|
|
|
{ |
|
|
|
_unsubscriptions.Add(topic); |
|
|
|
_subscriptionsNotPushed = true; |
|
|
|
} |
|
|
|
} |
|
|
@@ -274,16 +276,26 @@ namespace MQTTnet.ManagedClient |
|
|
|
_logger.Info<ManagedMqttClient>(nameof(ManagedMqttClient), "Synchronizing subscriptions"); |
|
|
|
|
|
|
|
List<TopicFilter> subscriptions; |
|
|
|
List<string> unsubscriptions; |
|
|
|
|
|
|
|
await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false); |
|
|
|
try |
|
|
|
{ |
|
|
|
subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList(); |
|
|
|
|
|
|
|
unsubscriptions = new List<string>(_unsubscriptions); |
|
|
|
_unsubscriptions.Clear(); |
|
|
|
|
|
|
|
_subscriptionsNotPushed = false; |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
_subscriptionsSemaphore.Release(); |
|
|
|
} |
|
|
|
if (unsubscriptions.Any()) |
|
|
|
{ |
|
|
|
await _mqttClient.UnsubscribeAsync(unsubscriptions); |
|
|
|
} |
|
|
|
|
|
|
|
if (!subscriptions.Any()) |
|
|
|
{ |
|
|
|