|
|
@@ -24,6 +24,8 @@ namespace MQTTnet.Extensions.ManagedClient |
|
|
|
|
|
|
|
private readonly IMqttClient _mqttClient; |
|
|
|
private readonly IMqttNetChildLogger _logger; |
|
|
|
|
|
|
|
private readonly AsyncLock _messageQueueLock = new AsyncLock(); |
|
|
|
|
|
|
|
private CancellationTokenSource _connectionCancellationToken; |
|
|
|
private CancellationTokenSource _publishingCancellationToken; |
|
|
@@ -147,7 +149,7 @@ namespace MQTTnet.Extensions.ManagedClient |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
lock (_messageQueue) |
|
|
|
using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) |
|
|
|
{ |
|
|
|
if (_messageQueue.Count >= Options.MaxPendingMessages) |
|
|
|
{ |
|
|
@@ -167,6 +169,16 @@ namespace MQTTnet.Extensions.ManagedClient |
|
|
|
} |
|
|
|
|
|
|
|
_messageQueue.Enqueue(applicationMessage); |
|
|
|
|
|
|
|
if (_storageManager != null) |
|
|
|
{ |
|
|
|
if (removedMessage != null) |
|
|
|
{ |
|
|
|
await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false); |
|
|
|
} |
|
|
|
|
|
|
|
await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
finally |
|
|
@@ -181,16 +193,6 @@ namespace MQTTnet.Extensions.ManagedClient |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (_storageManager != null) |
|
|
|
{ |
|
|
|
if (removedMessage != null) |
|
|
|
{ |
|
|
|
await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false); |
|
|
|
} |
|
|
|
|
|
|
|
await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters) |
|
|
@@ -377,7 +379,7 @@ namespace MQTTnet.Extensions.ManagedClient |
|
|
|
{ |
|
|
|
await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false); |
|
|
|
|
|
|
|
lock (_messageQueue) //lock to avoid conflict with this.PublishAsync |
|
|
|
using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync |
|
|
|
{ |
|
|
|
// While publishing this message, this.PublishAsync could have booted this |
|
|
|
// message off the queue to make room for another (when using a cap |
|
|
@@ -386,11 +388,11 @@ namespace MQTTnet.Extensions.ManagedClient |
|
|
|
// it from the queue. If not, that means this.PublishAsync has already |
|
|
|
// removed it, in which case we don't want to do anything. |
|
|
|
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); |
|
|
|
} |
|
|
|
|
|
|
|
if (_storageManager != null) |
|
|
|
{ |
|
|
|
await _storageManager.RemoveAsync(message).ConfigureAwait(false); |
|
|
|
|
|
|
|
if (_storageManager != null) |
|
|
|
{ |
|
|
|
await _storageManager.RemoveAsync(message).ConfigureAwait(false); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
catch (MqttCommunicationException exception) |
|
|
@@ -408,14 +410,14 @@ namespace MQTTnet.Extensions.ManagedClient |
|
|
|
//contradict the expected behavior of QoS 1 and 2, that's also true |
|
|
|
//for the usage of a message queue cap, so it's still consistent |
|
|
|
//with prior behavior in that way. |
|
|
|
lock (_messageQueue) //lock to avoid conflict with this.PublishAsync |
|
|
|
using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync |
|
|
|
{ |
|
|
|
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); |
|
|
|
} |
|
|
|
|
|
|
|
if (_storageManager != null) |
|
|
|
{ |
|
|
|
await _storageManager.RemoveAsync(message).ConfigureAwait(false); |
|
|
|
|
|
|
|
if (_storageManager != null) |
|
|
|
{ |
|
|
|
await _storageManager.RemoveAsync(message).ConfigureAwait(false); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@@ -533,4 +535,4 @@ namespace MQTTnet.Extensions.ManagedClient |
|
|
|
_connectionCancellationToken = null; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |