diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index c9053e4..0607d93 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -349,7 +349,7 @@ namespace MQTTnet.Extensions.ManagedClient // of the messages, the DropOldestQueuedMessage strategy would // be unable to know which message is actually the oldest and would // instead drop the first item in the queue. - var message = _messageQueue.PeekAndWait(); + var message = _messageQueue.PeekAndWait(cancellationToken); if (message == null) { continue; diff --git a/Source/MQTTnet/Internal/BlockingQueue.cs b/Source/MQTTnet/Internal/BlockingQueue.cs index 485f644..871c40a 100644 --- a/Source/MQTTnet/Internal/BlockingQueue.cs +++ b/Source/MQTTnet/Internal/BlockingQueue.cs @@ -8,7 +8,7 @@ namespace MQTTnet.Internal { private readonly object _syncRoot = new object(); private readonly LinkedList _items = new LinkedList(); - private readonly ManualResetEvent _gate = new ManualResetEvent(false); + private readonly ManualResetEventSlim _gate = new ManualResetEventSlim(false); public int Count { @@ -32,7 +32,7 @@ namespace MQTTnet.Internal } } - public TItem Dequeue() + public TItem Dequeue(CancellationToken cancellationToken = default(CancellationToken)) { while (true) { @@ -52,11 +52,11 @@ namespace MQTTnet.Internal } } - _gate.WaitOne(); + _gate.Wait(cancellationToken); } } - public TItem PeekAndWait() + public TItem PeekAndWait(CancellationToken cancellationToken = default(CancellationToken)) { while (true) { @@ -73,7 +73,7 @@ namespace MQTTnet.Internal } } - _gate.WaitOne(); + _gate.Wait(cancellationToken); } }