Browse Source

Merge branch 'develop' of https://github.com/chkr1011/MQTTnet into develop

release/3.x.x
Christian 6 years ago
parent
commit
0accb467b5
1 changed files with 12 additions and 0 deletions
  1. +12
    -0
      Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs

+ 12
- 0
Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs View File

@@ -16,6 +16,7 @@ namespace MQTTnet.ManagedClient
private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>(); private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>();
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>(); private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1); private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1);
private readonly List<string> _unsubscriptions = new List<string>();


private readonly IMqttClient _mqttClient; private readonly IMqttClient _mqttClient;
private readonly IMqttNetLogger _logger; private readonly IMqttNetLogger _logger;
@@ -130,6 +131,7 @@ namespace MQTTnet.ManagedClient
{ {
if (_subscriptions.Remove(topic)) if (_subscriptions.Remove(topic))
{ {
_unsubscriptions.Add(topic);
_subscriptionsNotPushed = true; _subscriptionsNotPushed = true;
} }
} }
@@ -283,16 +285,26 @@ namespace MQTTnet.ManagedClient
_logger.Info<ManagedMqttClient>(nameof(ManagedMqttClient), "Synchronizing subscriptions"); _logger.Info<ManagedMqttClient>(nameof(ManagedMqttClient), "Synchronizing subscriptions");


List<TopicFilter> subscriptions; List<TopicFilter> subscriptions;
List<string> unsubscriptions;

await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false); await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
try try
{ {
subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList(); subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList();

unsubscriptions = new List<string>(_unsubscriptions);
_unsubscriptions.Clear();

_subscriptionsNotPushed = false; _subscriptionsNotPushed = false;
} }
finally finally
{ {
_subscriptionsSemaphore.Release(); _subscriptionsSemaphore.Release();
} }
if (unsubscriptions.Any())
{
await _mqttClient.UnsubscribeAsync(unsubscriptions);
}


if (!subscriptions.Any()) if (!subscriptions.Any())
{ {


Loading…
Cancel
Save