Browse Source

Replace a manual lock with the lock class.

release/3.x.x
Christian Kratky 6 years ago
parent
commit
4838da48ef
1 changed files with 6 additions and 20 deletions
  1. +6
    -20
      Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs

+ 6
- 20
Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs View File

@@ -7,6 +7,7 @@ using System.Threading.Tasks;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Diagnostics; using MQTTnet.Diagnostics;
using MQTTnet.Exceptions; using MQTTnet.Exceptions;
using MQTTnet.Internal;
using MQTTnet.Protocol; using MQTTnet.Protocol;


namespace MQTTnet.ManagedClient namespace MQTTnet.ManagedClient
@@ -15,7 +16,7 @@ namespace MQTTnet.ManagedClient
{ {
private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>(); private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>();
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>(); private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1);
private readonly AsyncLock _subscriptionsLock = new AsyncLock();
private readonly List<string> _unsubscriptions = new List<string>(); private readonly List<string> _unsubscriptions = new List<string>();


private readonly IMqttClient _mqttClient; private readonly IMqttClient _mqttClient;
@@ -117,8 +118,7 @@ namespace MQTTnet.ManagedClient
{ {
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));


await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
try
using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false))
{ {
foreach (var topicFilter in topicFilters) foreach (var topicFilter in topicFilters)
{ {
@@ -126,16 +126,11 @@ namespace MQTTnet.ManagedClient
_subscriptionsNotPushed = true; _subscriptionsNotPushed = true;
} }
} }
finally
{
_subscriptionsSemaphore.Release();
}
} }


public async Task UnsubscribeAsync(IEnumerable<string> topics) public async Task UnsubscribeAsync(IEnumerable<string> topics)
{ {
await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
try
using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false))
{ {
foreach (var topic in topics) foreach (var topic in topics)
{ {
@@ -146,16 +141,12 @@ namespace MQTTnet.ManagedClient
} }
} }
} }
finally
{
_subscriptionsSemaphore.Release();
}
} }


public void Dispose() public void Dispose()
{ {
_messageQueue?.Dispose(); _messageQueue?.Dispose();
_subscriptionsSemaphore?.Dispose();
_subscriptionsLock?.Dispose();
_connectionCancellationToken?.Dispose(); _connectionCancellationToken?.Dispose();
_publishingCancellationToken?.Dispose(); _publishingCancellationToken?.Dispose();
} }
@@ -294,8 +285,7 @@ namespace MQTTnet.ManagedClient
List<TopicFilter> subscriptions; List<TopicFilter> subscriptions;
List<string> unsubscriptions; List<string> unsubscriptions;


await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
try
using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false))
{ {
subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList(); subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList();


@@ -304,10 +294,6 @@ namespace MQTTnet.ManagedClient


_subscriptionsNotPushed = false; _subscriptionsNotPushed = false;
} }
finally
{
_subscriptionsSemaphore.Release();
}
if (!subscriptions.Any() && !unsubscriptions.Any()) if (!subscriptions.Any() && !unsubscriptions.Any())
{ {


Loading…
Cancel
Save