Browse Source

Refactor code.

release/3.x.x
Christian Kratky 6 years ago
parent
commit
bddcbf6999
3 changed files with 10 additions and 5 deletions
  1. +5
    -5
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  2. +4
    -0
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs
  3. +1
    -0
      Source/MQTTnet/Internal/BlockingQueue.cs

+ 5
- 5
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -113,7 +113,7 @@ namespace MQTTnet.Extensions.ManagedClient
{ {
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));


ManagedMqttApplicationMessage skippedMessage = null;
ManagedMqttApplicationMessage removedMessage = null;
lock (_messageQueue) lock (_messageQueue)
{ {
if (_messageQueue.Count >= _options.MaxPendingMessages) if (_messageQueue.Count >= _options.MaxPendingMessages)
@@ -127,9 +127,9 @@ namespace MQTTnet.Extensions.ManagedClient


if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage) if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
{ {
skippedMessage = _messageQueue.RemoveFirst();
removedMessage = _messageQueue.RemoveFirst();
_logger.Verbose("Removed oldest application message from internal queue because it is full."); _logger.Verbose("Removed oldest application message from internal queue because it is full.");
ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(skippedMessage));
ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(removedMessage));
} }
} }


@@ -138,9 +138,9 @@ namespace MQTTnet.Extensions.ManagedClient


if (_storageManager != null) if (_storageManager != null)
{ {
if (skippedMessage != null)
if (removedMessage != null)
{ {
await _storageManager.RemoveAsync(skippedMessage).ConfigureAwait(false);
await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
} }


await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);


+ 4
- 0
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs View File

@@ -28,6 +28,8 @@ namespace MQTTnet.Extensions.ManagedClient


public async Task AddAsync(ManagedMqttApplicationMessage applicationMessage) public async Task AddAsync(ManagedMqttApplicationMessage applicationMessage)
{ {
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false))
{ {
_messages.Add(applicationMessage); _messages.Add(applicationMessage);
@@ -37,6 +39,8 @@ namespace MQTTnet.Extensions.ManagedClient


public async Task RemoveAsync(ManagedMqttApplicationMessage applicationMessage) public async Task RemoveAsync(ManagedMqttApplicationMessage applicationMessage)
{ {
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false))
{ {
var index = _messages.IndexOf(applicationMessage); var index = _messages.IndexOf(applicationMessage);


+ 1
- 0
Source/MQTTnet/Internal/BlockingQueue.cs View File

@@ -62,6 +62,7 @@ namespace MQTTnet.Internal
{ {
var item = _items.First; var item = _items.First;
_items.RemoveFirst(); _items.RemoveFirst();

return item.Value; return item.Value;
} }
} }


Loading…
Cancel
Save