Fix for #762 - thread pool leakrelease/3.x.x
@@ -2,7 +2,7 @@ | |||||
name: Bug report | name: Bug report | ||||
about: Create a report to help us improve. | about: Create a report to help us improve. | ||||
title: '' | title: '' | ||||
labels: '' | |||||
labels: 'bug' | |||||
assignees: '' | assignees: '' | ||||
--- | --- | ||||
@@ -2,7 +2,7 @@ | |||||
name: Custom issue template | name: Custom issue template | ||||
about: Do you have a question related to the project? Use this template. | about: Do you have a question related to the project? Use this template. | ||||
title: '' | title: '' | ||||
labels: '' | |||||
labels: 'question' | |||||
assignees: '' | assignees: '' | ||||
--- | --- | ||||
@@ -2,7 +2,7 @@ | |||||
name: Feature request | name: Feature request | ||||
about: Suggest an idea for this project. | about: Suggest an idea for this project. | ||||
title: '' | title: '' | ||||
labels: '' | |||||
labels: 'feature-request' | |||||
assignees: '' | assignees: '' | ||||
--- | --- | ||||
@@ -349,7 +349,7 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
// of the messages, the DropOldestQueuedMessage strategy would | // of the messages, the DropOldestQueuedMessage strategy would | ||||
// be unable to know which message is actually the oldest and would | // be unable to know which message is actually the oldest and would | ||||
// instead drop the first item in the queue. | // instead drop the first item in the queue. | ||||
var message = _messageQueue.PeekAndWait(); | |||||
var message = _messageQueue.PeekAndWait(cancellationToken); | |||||
if (message == null) | if (message == null) | ||||
{ | { | ||||
continue; | continue; | ||||
@@ -8,7 +8,7 @@ namespace MQTTnet.Internal | |||||
{ | { | ||||
private readonly object _syncRoot = new object(); | private readonly object _syncRoot = new object(); | ||||
private readonly LinkedList<TItem> _items = new LinkedList<TItem>(); | private readonly LinkedList<TItem> _items = new LinkedList<TItem>(); | ||||
private readonly ManualResetEvent _gate = new ManualResetEvent(false); | |||||
private readonly ManualResetEventSlim _gate = new ManualResetEventSlim(false); | |||||
public int Count | public int Count | ||||
{ | { | ||||
@@ -32,7 +32,7 @@ namespace MQTTnet.Internal | |||||
} | } | ||||
} | } | ||||
public TItem Dequeue() | |||||
public TItem Dequeue(CancellationToken cancellationToken = default(CancellationToken)) | |||||
{ | { | ||||
while (true) | while (true) | ||||
{ | { | ||||
@@ -52,11 +52,11 @@ namespace MQTTnet.Internal | |||||
} | } | ||||
} | } | ||||
_gate.WaitOne(); | |||||
_gate.Wait(cancellationToken); | |||||
} | } | ||||
} | } | ||||
public TItem PeekAndWait() | |||||
public TItem PeekAndWait(CancellationToken cancellationToken = default(CancellationToken)) | |||||
{ | { | ||||
while (true) | while (true) | ||||
{ | { | ||||
@@ -73,7 +73,7 @@ namespace MQTTnet.Internal | |||||
} | } | ||||
} | } | ||||
_gate.WaitOne(); | |||||
_gate.Wait(cancellationToken); | |||||
} | } | ||||
} | } | ||||