Browse Source

Refactor removal from blocking queue.

release/3.x.x
Christian Kratky 6 years ago
parent
commit
43105f71d8
2 changed files with 6 additions and 9 deletions
  1. +2
    -7
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  2. +4
    -2
      Source/MQTTnet/Internal/BlockingQueue.cs

+ 2
- 7
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -300,7 +300,7 @@ namespace MQTTnet.Extensions.ManagedClient
//in the queue is equal to this message, then it's safe to remove //in the queue is equal to this message, then it's safe to remove
//it from the queue. If not, that means this.PublishAsync has already //it from the queue. If not, that means this.PublishAsync has already
//removed it, in which case we don't want to do anything. //removed it, in which case we don't want to do anything.
_messageQueue.RemoveFirstIfEqual(message, IdsAreEqual);
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
} }
_storageManager?.RemoveAsync(message).GetAwaiter().GetResult(); _storageManager?.RemoveAsync(message).GetAwaiter().GetResult();
} }
@@ -321,7 +321,7 @@ namespace MQTTnet.Extensions.ManagedClient
//with prior behavior in that way. //with prior behavior in that way.
lock (_messageQueue) //lock to avoid conflict with this.PublishAsync lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
{ {
_messageQueue.RemoveFirstIfEqual(message, IdsAreEqual);
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
} }
} }
} }
@@ -335,11 +335,6 @@ namespace MQTTnet.Extensions.ManagedClient
ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException)); ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException));
} }
} }
private static bool IdsAreEqual(ManagedMqttApplicationMessage message1, ManagedMqttApplicationMessage message2)
{
return message1.Id.Equals(message2.Id);
}


private async Task SynchronizeSubscriptionsAsync() private async Task SynchronizeSubscriptionsAsync()
{ {


+ 4
- 2
Source/MQTTnet/Internal/BlockingQueue.cs View File

@@ -77,11 +77,13 @@ namespace MQTTnet.Internal
} }
} }


public void RemoveFirstIfEqual(TItem item, Func<TItem, TItem, bool> areEqual)
public void RemoveFirst(Predicate<TItem> match)
{ {
if (match == null) throw new ArgumentNullException(nameof(match));

lock (_syncRoot) lock (_syncRoot)
{ {
if (_items.Count > 0 && areEqual(_items.First.Value, item))
if (_items.Count > 0 && match(_items.First.Value))
{ {
_items.RemoveFirst(); _items.RemoveFirst();
} }


Loading…
Cancel
Save