diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 6dec29f..271e6b9 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -113,7 +113,7 @@ namespace MQTTnet.Extensions.ManagedClient { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); - ManagedMqttApplicationMessage skippedMessage = null; + ManagedMqttApplicationMessage removedMessage = null; lock (_messageQueue) { if (_messageQueue.Count >= _options.MaxPendingMessages) @@ -127,9 +127,9 @@ namespace MQTTnet.Extensions.ManagedClient if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage) { - skippedMessage = _messageQueue.RemoveFirst(); + removedMessage = _messageQueue.RemoveFirst(); _logger.Verbose("Removed oldest application message from internal queue because it is full."); - ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(skippedMessage)); + ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(removedMessage)); } } @@ -138,9 +138,9 @@ namespace MQTTnet.Extensions.ManagedClient if (_storageManager != null) { - if (skippedMessage != null) + if (removedMessage != null) { - await _storageManager.RemoveAsync(skippedMessage).ConfigureAwait(false); + await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false); } await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs index 0f4e33e..821a7d9 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs @@ -28,6 +28,8 @@ namespace MQTTnet.Extensions.ManagedClient public async Task AddAsync(ManagedMqttApplicationMessage applicationMessage) { + if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); + using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { _messages.Add(applicationMessage); @@ -37,6 +39,8 @@ namespace MQTTnet.Extensions.ManagedClient public async Task RemoveAsync(ManagedMqttApplicationMessage applicationMessage) { + if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); + using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { var index = _messages.IndexOf(applicationMessage); diff --git a/Source/MQTTnet/Internal/BlockingQueue.cs b/Source/MQTTnet/Internal/BlockingQueue.cs index ae6cc98..d6c28f4 100644 --- a/Source/MQTTnet/Internal/BlockingQueue.cs +++ b/Source/MQTTnet/Internal/BlockingQueue.cs @@ -62,6 +62,7 @@ namespace MQTTnet.Internal { var item = _items.First; _items.RemoveFirst(); + return item.Value; } }