Browse Source

Fixed storage queue race condition

Sometimes, TryPublishQueuedMessageAsync would try to remove a message from the storage queue before PublishAsync added it to the storage queue, resulting in a message being stuck in the storage queue forever.  Switched the message queue lock to an async lock and synchronized the storage queue updates with the message queue updates.
release/3.x.x
Paul Fake 5 years ago
committed by GitHub
parent
commit
e1590fc0c8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 26 additions and 24 deletions
  1. +26
    -24
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs

+ 26
- 24
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -24,6 +24,8 @@ namespace MQTTnet.Extensions.ManagedClient

private readonly IMqttClient _mqttClient;
private readonly IMqttNetChildLogger _logger;
private readonly AsyncLock _messageQueueLock = new AsyncLock();

private CancellationTokenSource _connectionCancellationToken;
private CancellationTokenSource _publishingCancellationToken;
@@ -147,7 +149,7 @@ namespace MQTTnet.Extensions.ManagedClient

try
{
lock (_messageQueue)
using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))
{
if (_messageQueue.Count >= Options.MaxPendingMessages)
{
@@ -167,6 +169,16 @@ namespace MQTTnet.Extensions.ManagedClient
}

_messageQueue.Enqueue(applicationMessage);
if (_storageManager != null)
{
if (removedMessage != null)
{
await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
}

await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
}
}
}
finally
@@ -181,16 +193,6 @@ namespace MQTTnet.Extensions.ManagedClient
}

}
if (_storageManager != null)
{
if (removedMessage != null)
{
await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
}

await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
}
}

public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
@@ -377,7 +379,7 @@ namespace MQTTnet.Extensions.ManagedClient
{
await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);

lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
{
// While publishing this message, this.PublishAsync could have booted this
// message off the queue to make room for another (when using a cap
@@ -386,11 +388,11 @@ namespace MQTTnet.Extensions.ManagedClient
// it from the queue. If not, that means this.PublishAsync has already
// removed it, in which case we don't want to do anything.
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
}
if (_storageManager != null)
{
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
if (_storageManager != null)
{
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
}
}
}
catch (MqttCommunicationException exception)
@@ -408,14 +410,14 @@ namespace MQTTnet.Extensions.ManagedClient
//contradict the expected behavior of QoS 1 and 2, that's also true
//for the usage of a message queue cap, so it's still consistent
//with prior behavior in that way.
lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
{
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
}
if (_storageManager != null)
{
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
if (_storageManager != null)
{
await _storageManager.RemoveAsync(message).ConfigureAwait(false);
}
}
}
}
@@ -533,4 +535,4 @@ namespace MQTTnet.Extensions.ManagedClient
_connectionCancellationToken = null;
}
}
}
}

Loading…
Cancel
Save