From e1590fc0c8314e4117f001769b14dfc9a990c7eb Mon Sep 17 00:00:00 2001 From: Paul Fake Date: Wed, 17 Jul 2019 16:55:56 -0400 Subject: [PATCH] Fixed storage queue race condition Sometimes, TryPublishQueuedMessageAsync would try to remove a message from the storage queue before PublishAsync added it to the storage queue, resulting in a message being stuck in the storage queue forever. Switched the message queue lock to an async lock and synchronized the storage queue updates with the message queue updates. --- .../ManagedMqttClient.cs | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 74d148d..a6ee48b 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -24,6 +24,8 @@ namespace MQTTnet.Extensions.ManagedClient private readonly IMqttClient _mqttClient; private readonly IMqttNetChildLogger _logger; + + private readonly AsyncLock _messageQueueLock = new AsyncLock(); private CancellationTokenSource _connectionCancellationToken; private CancellationTokenSource _publishingCancellationToken; @@ -147,7 +149,7 @@ namespace MQTTnet.Extensions.ManagedClient try { - lock (_messageQueue) + using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) { if (_messageQueue.Count >= Options.MaxPendingMessages) { @@ -167,6 +169,16 @@ namespace MQTTnet.Extensions.ManagedClient } _messageQueue.Enqueue(applicationMessage); + + if (_storageManager != null) + { + if (removedMessage != null) + { + await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false); + } + + await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); + } } } finally @@ -181,16 +193,6 @@ namespace MQTTnet.Extensions.ManagedClient } } - - if (_storageManager != null) - { - if (removedMessage != null) - { - await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false); - } - - await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); - } } public Task SubscribeAsync(IEnumerable topicFilters) @@ -377,7 +379,7 @@ namespace MQTTnet.Extensions.ManagedClient { await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false); - lock (_messageQueue) //lock to avoid conflict with this.PublishAsync + using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync { // While publishing this message, this.PublishAsync could have booted this // message off the queue to make room for another (when using a cap @@ -386,11 +388,11 @@ namespace MQTTnet.Extensions.ManagedClient // it from the queue. If not, that means this.PublishAsync has already // removed it, in which case we don't want to do anything. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); - } - - if (_storageManager != null) - { - await _storageManager.RemoveAsync(message).ConfigureAwait(false); + + if (_storageManager != null) + { + await _storageManager.RemoveAsync(message).ConfigureAwait(false); + } } } catch (MqttCommunicationException exception) @@ -408,14 +410,14 @@ namespace MQTTnet.Extensions.ManagedClient //contradict the expected behavior of QoS 1 and 2, that's also true //for the usage of a message queue cap, so it's still consistent //with prior behavior in that way. - lock (_messageQueue) //lock to avoid conflict with this.PublishAsync + using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync { _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); - } - - if (_storageManager != null) - { - await _storageManager.RemoveAsync(message).ConfigureAwait(false); + + if (_storageManager != null) + { + await _storageManager.RemoveAsync(message).ConfigureAwait(false); + } } } } @@ -533,4 +535,4 @@ namespace MQTTnet.Extensions.ManagedClient _connectionCancellationToken = null; } } -} \ No newline at end of file +}