@@ -88,5 +88,17 @@ namespace MQTTnet.Implementations | |||||
#endif | #endif | ||||
} | } | ||||
public static Task CompletedTask | |||||
{ | |||||
get | |||||
{ | |||||
#if NET452 | |||||
return Task.FromResult(0); | |||||
#else | |||||
return Task.CompletedTask; | |||||
#endif | |||||
} | |||||
} | |||||
} | } | ||||
} | } |
@@ -1,131 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace MQTTnet.Internal | |||||
{ | |||||
// Inspired from Stephen Toub (https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-2-asyncautoresetevent/) and Chris Gillum (https://stackoverflow.com/a/43012490) | |||||
public class AsyncAutoResetEvent | |||||
{ | |||||
private readonly LinkedList<TaskCompletionSource<bool>> _waiters = new LinkedList<TaskCompletionSource<bool>>(); | |||||
private bool _isSignaled; | |||||
public AsyncAutoResetEvent() | |||||
: this(false) | |||||
{ | |||||
} | |||||
public AsyncAutoResetEvent(bool signaled) | |||||
{ | |||||
_isSignaled = signaled; | |||||
} | |||||
public int WaitersCount | |||||
{ | |||||
get | |||||
{ | |||||
lock (_waiters) | |||||
{ | |||||
return _waiters.Count; | |||||
} | |||||
} | |||||
} | |||||
public Task<bool> WaitOneAsync() | |||||
{ | |||||
return WaitOneAsync(CancellationToken.None); | |||||
} | |||||
public Task<bool> WaitOneAsync(TimeSpan timeout) | |||||
{ | |||||
return WaitOneAsync(timeout, CancellationToken.None); | |||||
} | |||||
public Task<bool> WaitOneAsync(CancellationToken cancellationToken) | |||||
{ | |||||
return WaitOneAsync(Timeout.InfiniteTimeSpan, cancellationToken); | |||||
} | |||||
public async Task<bool> WaitOneAsync(TimeSpan timeout, CancellationToken cancellationToken) | |||||
{ | |||||
cancellationToken.ThrowIfCancellationRequested(); | |||||
TaskCompletionSource<bool> tcs; | |||||
lock (_waiters) | |||||
{ | |||||
if (_isSignaled) | |||||
{ | |||||
_isSignaled = false; | |||||
return true; | |||||
} | |||||
if (timeout == TimeSpan.Zero) | |||||
{ | |||||
return _isSignaled; | |||||
} | |||||
tcs = new TaskCompletionSource<bool>(); | |||||
_waiters.AddLast(tcs); | |||||
} | |||||
Task winner; | |||||
if (timeout == Timeout.InfiniteTimeSpan) | |||||
{ | |||||
using (cancellationToken.Register(() => { tcs.TrySetCanceled(); })) | |||||
{ | |||||
await tcs.Task.ConfigureAwait(false); | |||||
winner = tcs.Task; | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
winner = await Task.WhenAny(tcs.Task, Task.Delay(timeout, cancellationToken)).ConfigureAwait(false); | |||||
} | |||||
var taskWasSignaled = winner == tcs.Task; | |||||
if (taskWasSignaled) | |||||
{ | |||||
return true; | |||||
} | |||||
// We timed-out; remove our reference to the task. | |||||
// This is an O(n) operation since waiters is a LinkedList<T>. | |||||
lock (_waiters) | |||||
{ | |||||
_waiters.Remove(tcs); | |||||
if (winner.Status == TaskStatus.Canceled) | |||||
{ | |||||
throw new OperationCanceledException(cancellationToken); | |||||
} | |||||
throw new TimeoutException(); | |||||
} | |||||
} | |||||
public void Set() | |||||
{ | |||||
TaskCompletionSource<bool> toRelease = null; | |||||
lock (_waiters) | |||||
{ | |||||
if (_waiters.Count > 0) | |||||
{ | |||||
// Signal the first task in the waiters list. | |||||
toRelease = _waiters.First.Value; | |||||
_waiters.RemoveFirst(); | |||||
} | |||||
else if (!_isSignaled) | |||||
{ | |||||
// No tasks are pending | |||||
_isSignaled = true; | |||||
} | |||||
} | |||||
toRelease?.TrySetResult(true); | |||||
} | |||||
} | |||||
} |
@@ -44,12 +44,13 @@ namespace MQTTnet.Internal | |||||
{ | { | ||||
return; | return; | ||||
} | } | ||||
_isDisposed = true; | |||||
// Do not change this code. Put cleanup code in Dispose(bool disposing) above. | // Do not change this code. Put cleanup code in Dispose(bool disposing) above. | ||||
Dispose(true); | Dispose(true); | ||||
// TODO: uncomment the following line if the finalizer is overridden above. | // TODO: uncomment the following line if the finalizer is overridden above. | ||||
// GC.SuppressFinalize(this); | // GC.SuppressFinalize(this); | ||||
_isDisposed = true; | |||||
} | } | ||||
#endregion | #endregion | ||||
} | } | ||||
@@ -9,7 +9,7 @@ namespace MQTTnet.PacketDispatcher | |||||
{ | { | ||||
public sealed class MqttPacketAwaiter<TPacket> : Disposable, IMqttPacketAwaiter where TPacket : MqttBasePacket | public sealed class MqttPacketAwaiter<TPacket> : Disposable, IMqttPacketAwaiter where TPacket : MqttBasePacket | ||||
{ | { | ||||
private readonly TaskCompletionSource<MqttBasePacket> _taskCompletionSource = new TaskCompletionSource<MqttBasePacket>(); | |||||
private readonly TaskCompletionSource<MqttBasePacket> _taskCompletionSource; | |||||
private readonly ushort? _packetIdentifier; | private readonly ushort? _packetIdentifier; | ||||
private readonly MqttPacketDispatcher _owningPacketDispatcher; | private readonly MqttPacketDispatcher _owningPacketDispatcher; | ||||
@@ -17,13 +17,18 @@ namespace MQTTnet.PacketDispatcher | |||||
{ | { | ||||
_packetIdentifier = packetIdentifier; | _packetIdentifier = packetIdentifier; | ||||
_owningPacketDispatcher = owningPacketDispatcher ?? throw new ArgumentNullException(nameof(owningPacketDispatcher)); | _owningPacketDispatcher = owningPacketDispatcher ?? throw new ArgumentNullException(nameof(owningPacketDispatcher)); | ||||
#if NET452 | |||||
_taskCompletionSource = new TaskCompletionSource<MqttBasePacket>(); | |||||
#else | |||||
_taskCompletionSource = new TaskCompletionSource<MqttBasePacket>(TaskCreationOptions.RunContinuationsAsynchronously); | |||||
#endif | |||||
} | } | ||||
public async Task<TPacket> WaitOneAsync(TimeSpan timeout) | public async Task<TPacket> WaitOneAsync(TimeSpan timeout) | ||||
{ | { | ||||
using (var timeoutToken = new CancellationTokenSource(timeout)) | using (var timeoutToken = new CancellationTokenSource(timeout)) | ||||
{ | { | ||||
timeoutToken.Token.Register(() => _taskCompletionSource.TrySetException(new MqttCommunicationTimedOutException())); | |||||
timeoutToken.Token.Register(() => Fail(new MqttCommunicationTimedOutException())); | |||||
var packet = await _taskCompletionSource.Task.ConfigureAwait(false); | var packet = await _taskCompletionSource.Task.ConfigureAwait(false); | ||||
return (TPacket)packet; | return (TPacket)packet; | ||||
@@ -33,24 +38,47 @@ namespace MQTTnet.PacketDispatcher | |||||
public void Complete(MqttBasePacket packet) | public void Complete(MqttBasePacket packet) | ||||
{ | { | ||||
if (packet == null) throw new ArgumentNullException(nameof(packet)); | if (packet == null) throw new ArgumentNullException(nameof(packet)); | ||||
#if NET452 | |||||
// To prevent deadlocks it is required to call the _TrySetResult_ method | // To prevent deadlocks it is required to call the _TrySetResult_ method | ||||
// from a new thread because the awaiting code will not(!) be executed in | // from a new thread because the awaiting code will not(!) be executed in | ||||
// a new thread automatically (due to await). Furthermore _this_ thread will | // a new thread automatically (due to await). Furthermore _this_ thread will | ||||
// do it. But _this_ thread is also reading incoming packets -> deadlock. | // do it. But _this_ thread is also reading incoming packets -> deadlock. | ||||
// NET452 does not support RunContinuationsAsynchronously | |||||
Task.Run(() => _taskCompletionSource.TrySetResult(packet)); | Task.Run(() => _taskCompletionSource.TrySetResult(packet)); | ||||
#else | |||||
_taskCompletionSource.TrySetResult(packet); | |||||
#endif | |||||
} | } | ||||
public void Fail(Exception exception) | public void Fail(Exception exception) | ||||
{ | { | ||||
if (exception == null) throw new ArgumentNullException(nameof(exception)); | if (exception == null) throw new ArgumentNullException(nameof(exception)); | ||||
#if NET452 | |||||
// To prevent deadlocks it is required to call the _TrySetResult_ method | |||||
// from a new thread because the awaiting code will not(!) be executed in | |||||
// a new thread automatically (due to await). Furthermore _this_ thread will | |||||
// do it. But _this_ thread is also reading incoming packets -> deadlock. | |||||
// NET452 does not support RunContinuationsAsynchronously | |||||
Task.Run(() => _taskCompletionSource.TrySetException(exception)); | Task.Run(() => _taskCompletionSource.TrySetException(exception)); | ||||
#else | |||||
_taskCompletionSource.TrySetException(exception); | |||||
#endif | |||||
} | } | ||||
public void Cancel() | public void Cancel() | ||||
{ | { | ||||
#if NET452 | |||||
// To prevent deadlocks it is required to call the _TrySetResult_ method | |||||
// from a new thread because the awaiting code will not(!) be executed in | |||||
// a new thread automatically (due to await). Furthermore _this_ thread will | |||||
// do it. But _this_ thread is also reading incoming packets -> deadlock. | |||||
// NET452 does not support RunContinuationsAsynchronously | |||||
Task.Run(() => _taskCompletionSource.TrySetCanceled()); | Task.Run(() => _taskCompletionSource.TrySetCanceled()); | ||||
#else | |||||
_taskCompletionSource.TrySetCanceled(); | |||||
#endif | |||||
} | } | ||||
protected override void Dispose(bool disposing) | protected override void Dispose(bool disposing) | ||||
@@ -3,6 +3,7 @@ using System.Collections.Generic; | |||||
using System.Linq; | using System.Linq; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using MQTTnet.Diagnostics; | using MQTTnet.Diagnostics; | ||||
using MQTTnet.Implementations; | |||||
using MQTTnet.Internal; | using MQTTnet.Internal; | ||||
namespace MQTTnet.Server | namespace MQTTnet.Server | ||||
@@ -21,7 +22,7 @@ namespace MQTTnet.Server | |||||
if (logger == null) throw new ArgumentNullException(nameof(logger)); | if (logger == null) throw new ArgumentNullException(nameof(logger)); | ||||
_logger = logger.CreateChildLogger(nameof(MqttRetainedMessagesManager)); | _logger = logger.CreateChildLogger(nameof(MqttRetainedMessagesManager)); | ||||
_options = options ?? throw new ArgumentNullException(nameof(options)); | _options = options ?? throw new ArgumentNullException(nameof(options)); | ||||
return Task.CompletedTask; | |||||
return PlatformAbstractionLayer.CompletedTask; | |||||
} | } | ||||
public async Task LoadMessagesAsync() | public async Task LoadMessagesAsync() | ||||
@@ -1,237 +0,0 @@ | |||||
using System; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||||
using MQTTnet.Internal; | |||||
namespace MQTTnet.Tests | |||||
{ | |||||
[TestClass] | |||||
// Inspired from the vs-threading tests (https://github.com/Microsoft/vs-threading/blob/master/src/Microsoft.VisualStudio.Threading.Tests/AsyncAutoResetEventTests.cs) | |||||
public class AsyncAutoResetEvent_Tests | |||||
{ | |||||
private readonly AsyncAutoResetEvent _aare; | |||||
public AsyncAutoResetEvent_Tests() | |||||
{ | |||||
_aare = new AsyncAutoResetEvent(); | |||||
} | |||||
[TestMethod] | |||||
public async Task Cleanup_Waiters() | |||||
{ | |||||
var @lock = new AsyncAutoResetEvent(); | |||||
var waitOnePassed = false; | |||||
#pragma warning disable 4014 | |||||
Task.Run(async () => | |||||
#pragma warning restore 4014 | |||||
{ | |||||
await @lock.WaitOneAsync(TimeSpan.FromSeconds(2)); | |||||
waitOnePassed = true; | |||||
}); | |||||
await Task.Delay(500); | |||||
Assert.AreEqual(1, @lock.WaitersCount); | |||||
@lock.Set(); | |||||
await Task.Delay(1000); | |||||
Assert.IsTrue(waitOnePassed); | |||||
Assert.AreEqual(0, @lock.WaitersCount); | |||||
} | |||||
[TestMethod] | |||||
public async Task SingleThreadedPulse() | |||||
{ | |||||
for (int i = 0; i < 5; i++) | |||||
{ | |||||
var t = _aare.WaitOneAsync(); | |||||
Assert.IsFalse(t.IsCompleted); | |||||
_aare.Set(); | |||||
await t; | |||||
Assert.IsTrue(t.IsCompleted); | |||||
} | |||||
} | |||||
[TestMethod] | |||||
public async Task MultipleSetOnlySignalsOnce() | |||||
{ | |||||
_aare.Set(); | |||||
_aare.Set(); | |||||
await _aare.WaitOneAsync(); | |||||
var t = _aare.WaitOneAsync(); | |||||
Assert.IsFalse(t.IsCompleted); | |||||
await Task.Delay(500); | |||||
Assert.IsFalse(t.IsCompleted); | |||||
_aare.Set(); | |||||
await t; | |||||
Assert.IsTrue(t.IsCompleted); | |||||
} | |||||
[TestMethod] | |||||
public async Task OrderPreservingQueue() | |||||
{ | |||||
var waiters = new Task[5]; | |||||
for (int i = 0; i < waiters.Length; i++) | |||||
{ | |||||
waiters[i] = _aare.WaitOneAsync(); | |||||
} | |||||
for (int i = 0; i < waiters.Length; i++) | |||||
{ | |||||
_aare.Set(); | |||||
await waiters[i].ConfigureAwait(false); | |||||
} | |||||
} | |||||
// This test does not work in appveyor but on local machine it does!? | |||||
/////// <summary> | |||||
/////// Verifies that inlining continuations do not have to complete execution before Set() returns. | |||||
/////// </summary> | |||||
////[TestMethod] | |||||
////public async Task SetReturnsBeforeInlinedContinuations() | |||||
////{ | |||||
//// var setReturned = new ManualResetEventSlim(); | |||||
//// var inlinedContinuation = _aare.WaitOneAsync() | |||||
//// .ContinueWith(delegate | |||||
//// { | |||||
//// // Arrange to synchronously block the continuation until Set() has returned, | |||||
//// // which would deadlock if Set does not return until inlined continuations complete. | |||||
//// Assert.IsTrue(setReturned.Wait(500)); | |||||
//// }); | |||||
//// await Task.Delay(100); | |||||
//// _aare.Set(); | |||||
//// setReturned.Set(); | |||||
//// Assert.IsTrue(inlinedContinuation.Wait(500)); | |||||
////} | |||||
[TestMethod] | |||||
public void WaitAsync_WithCancellationToken() | |||||
{ | |||||
var cts = new CancellationTokenSource(); | |||||
Task waitTask = _aare.WaitOneAsync(cts.Token); | |||||
Assert.IsFalse(waitTask.IsCompleted); | |||||
// Cancel the request and ensure that it propagates to the task. | |||||
cts.Cancel(); | |||||
try | |||||
{ | |||||
waitTask.GetAwaiter().GetResult(); | |||||
Assert.IsTrue(false, "Task was expected to transition to a canceled state."); | |||||
} | |||||
catch (OperationCanceledException) | |||||
{ | |||||
} | |||||
// Now set the event and verify that a future waiter gets the signal immediately. | |||||
_aare.Set(); | |||||
waitTask = _aare.WaitOneAsync(); | |||||
Assert.AreEqual(TaskStatus.WaitingForActivation, waitTask.Status); | |||||
} | |||||
[TestMethod] | |||||
public void WaitAsync_WithCancellationToken_Precanceled() | |||||
{ | |||||
// We construct our own pre-canceled token so that we can do | |||||
// a meaningful identity check later. | |||||
var tokenSource = new CancellationTokenSource(); | |||||
tokenSource.Cancel(); | |||||
var token = tokenSource.Token; | |||||
// Verify that a pre-set signal is not reset by a canceled wait request. | |||||
_aare.Set(); | |||||
try | |||||
{ | |||||
_aare.WaitOneAsync(token).GetAwaiter().GetResult(); | |||||
Assert.IsTrue(false, "Task was expected to transition to a canceled state."); | |||||
} | |||||
catch (OperationCanceledException ex) | |||||
{ | |||||
Assert.AreEqual(token, ex.CancellationToken); | |||||
} | |||||
// Verify that the signal was not acquired. | |||||
Task waitTask = _aare.WaitOneAsync(); | |||||
Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); | |||||
} | |||||
[TestMethod] | |||||
public async Task WaitAsync_WithTimeout() | |||||
{ | |||||
Task waitTask = _aare.WaitOneAsync(TimeSpan.FromMilliseconds(500)); | |||||
Assert.IsFalse(waitTask.IsCompleted); | |||||
// Cancel the request and ensure that it propagates to the task. | |||||
await Task.Delay(1000).ConfigureAwait(false); | |||||
try | |||||
{ | |||||
waitTask.GetAwaiter().GetResult(); | |||||
Assert.IsTrue(false, "Task was expected to transition to a timeout state."); | |||||
} | |||||
catch (TimeoutException) | |||||
{ | |||||
Assert.IsTrue(true); | |||||
} | |||||
// Now set the event and verify that a future waiter gets the signal immediately. | |||||
_aare.Set(); | |||||
waitTask = _aare.WaitOneAsync(TimeSpan.FromMilliseconds(500)); | |||||
Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); | |||||
} | |||||
[TestMethod] | |||||
public void WaitAsync_Canceled_DoesNotInlineContinuations() | |||||
{ | |||||
var cts = new CancellationTokenSource(); | |||||
var task = _aare.WaitOneAsync(cts.Token); | |||||
var completingActionFinished = new ManualResetEventSlim(); | |||||
var continuation = task.ContinueWith( | |||||
_ => Assert.IsTrue(completingActionFinished.Wait(500)), | |||||
CancellationToken.None, | |||||
TaskContinuationOptions.None, | |||||
TaskScheduler.Default); | |||||
cts.Cancel(); | |||||
completingActionFinished.Set(); | |||||
// Rethrow the exception if it turned out it deadlocked. | |||||
continuation.GetAwaiter().GetResult(); | |||||
} | |||||
[TestMethod] | |||||
public async Task AsyncAutoResetEvent() | |||||
{ | |||||
var aare = new AsyncAutoResetEvent(); | |||||
var globalI = 0; | |||||
#pragma warning disable 4014 | |||||
Task.Run(async () => | |||||
#pragma warning restore 4014 | |||||
{ | |||||
await aare.WaitOneAsync(CancellationToken.None); | |||||
globalI += 1; | |||||
}); | |||||
#pragma warning disable 4014 | |||||
Task.Run(async () => | |||||
#pragma warning restore 4014 | |||||
{ | |||||
await aare.WaitOneAsync(CancellationToken.None); | |||||
globalI += 2; | |||||
}); | |||||
await Task.Delay(500); | |||||
aare.Set(); | |||||
await Task.Delay(500); | |||||
aare.Set(); | |||||
await Task.Delay(100); | |||||
Assert.AreEqual(3, globalI); | |||||
} | |||||
} | |||||
} |