diff --git a/Source/MQTTnet/Internal/AsyncAutoResetEvent.cs b/Source/MQTTnet/Internal/AsyncAutoResetEvent.cs deleted file mode 100644 index cd62f07..0000000 --- a/Source/MQTTnet/Internal/AsyncAutoResetEvent.cs +++ /dev/null @@ -1,131 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -namespace MQTTnet.Internal -{ - // Inspired from Stephen Toub (https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-2-asyncautoresetevent/) and Chris Gillum (https://stackoverflow.com/a/43012490) - public class AsyncAutoResetEvent - { - private readonly LinkedList> _waiters = new LinkedList>(); - - private bool _isSignaled; - - public AsyncAutoResetEvent() - : this(false) - { - } - - public AsyncAutoResetEvent(bool signaled) - { - _isSignaled = signaled; - } - - public int WaitersCount - { - get - { - lock (_waiters) - { - return _waiters.Count; - } - } - } - - public Task WaitOneAsync() - { - return WaitOneAsync(CancellationToken.None); - } - - public Task WaitOneAsync(TimeSpan timeout) - { - return WaitOneAsync(timeout, CancellationToken.None); - } - - public Task WaitOneAsync(CancellationToken cancellationToken) - { - return WaitOneAsync(Timeout.InfiniteTimeSpan, cancellationToken); - } - - public async Task WaitOneAsync(TimeSpan timeout, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - TaskCompletionSource tcs; - - lock (_waiters) - { - if (_isSignaled) - { - _isSignaled = false; - return true; - } - - if (timeout == TimeSpan.Zero) - { - return _isSignaled; - } - - tcs = new TaskCompletionSource(); - _waiters.AddLast(tcs); - } - - Task winner; - if (timeout == Timeout.InfiniteTimeSpan) - { - using (cancellationToken.Register(() => { tcs.TrySetCanceled(); })) - { - await tcs.Task.ConfigureAwait(false); - winner = tcs.Task; - } - } - else - { - winner = await Task.WhenAny(tcs.Task, Task.Delay(timeout, cancellationToken)).ConfigureAwait(false); - } - - var taskWasSignaled = winner == tcs.Task; - if (taskWasSignaled) - { - return true; - } - - // We timed-out; remove our reference to the task. - // This is an O(n) operation since waiters is a LinkedList. - lock (_waiters) - { - _waiters.Remove(tcs); - - if (winner.Status == TaskStatus.Canceled) - { - throw new OperationCanceledException(cancellationToken); - } - - throw new TimeoutException(); - } - } - - public void Set() - { - TaskCompletionSource toRelease = null; - - lock (_waiters) - { - if (_waiters.Count > 0) - { - // Signal the first task in the waiters list. - toRelease = _waiters.First.Value; - _waiters.RemoveFirst(); - } - else if (!_isSignaled) - { - // No tasks are pending - _isSignaled = true; - } - } - - toRelease?.TrySetResult(true); - } - } -} diff --git a/Tests/MQTTnet.Core.Tests/AsyncAutoResentEvent_Tests.cs b/Tests/MQTTnet.Core.Tests/AsyncAutoResentEvent_Tests.cs deleted file mode 100644 index d72712d..0000000 --- a/Tests/MQTTnet.Core.Tests/AsyncAutoResentEvent_Tests.cs +++ /dev/null @@ -1,237 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using MQTTnet.Internal; - -namespace MQTTnet.Tests -{ - [TestClass] - // Inspired from the vs-threading tests (https://github.com/Microsoft/vs-threading/blob/master/src/Microsoft.VisualStudio.Threading.Tests/AsyncAutoResetEventTests.cs) - public class AsyncAutoResetEvent_Tests - { - private readonly AsyncAutoResetEvent _aare; - - public AsyncAutoResetEvent_Tests() - { - _aare = new AsyncAutoResetEvent(); - } - - [TestMethod] - public async Task Cleanup_Waiters() - { - var @lock = new AsyncAutoResetEvent(); - - var waitOnePassed = false; - -#pragma warning disable 4014 - Task.Run(async () => -#pragma warning restore 4014 - { - await @lock.WaitOneAsync(TimeSpan.FromSeconds(2)); - waitOnePassed = true; - }); - - await Task.Delay(500); - - Assert.AreEqual(1, @lock.WaitersCount); - - @lock.Set(); - - await Task.Delay(1000); - - Assert.IsTrue(waitOnePassed); - Assert.AreEqual(0, @lock.WaitersCount); - } - - [TestMethod] - public async Task SingleThreadedPulse() - { - for (int i = 0; i < 5; i++) - { - var t = _aare.WaitOneAsync(); - Assert.IsFalse(t.IsCompleted); - _aare.Set(); - await t; - Assert.IsTrue(t.IsCompleted); - } - } - - [TestMethod] - public async Task MultipleSetOnlySignalsOnce() - { - _aare.Set(); - _aare.Set(); - await _aare.WaitOneAsync(); - var t = _aare.WaitOneAsync(); - Assert.IsFalse(t.IsCompleted); - await Task.Delay(500); - Assert.IsFalse(t.IsCompleted); - _aare.Set(); - await t; - Assert.IsTrue(t.IsCompleted); - } - - [TestMethod] - public async Task OrderPreservingQueue() - { - var waiters = new Task[5]; - for (int i = 0; i < waiters.Length; i++) - { - waiters[i] = _aare.WaitOneAsync(); - } - - for (int i = 0; i < waiters.Length; i++) - { - _aare.Set(); - await waiters[i].ConfigureAwait(false); - } - } - - // This test does not work in appveyor but on local machine it does!? - /////// - /////// Verifies that inlining continuations do not have to complete execution before Set() returns. - /////// - ////[TestMethod] - ////public async Task SetReturnsBeforeInlinedContinuations() - ////{ - //// var setReturned = new ManualResetEventSlim(); - //// var inlinedContinuation = _aare.WaitOneAsync() - //// .ContinueWith(delegate - //// { - //// // Arrange to synchronously block the continuation until Set() has returned, - //// // which would deadlock if Set does not return until inlined continuations complete. - //// Assert.IsTrue(setReturned.Wait(500)); - //// }); - //// await Task.Delay(100); - //// _aare.Set(); - //// setReturned.Set(); - //// Assert.IsTrue(inlinedContinuation.Wait(500)); - ////} - - [TestMethod] - public void WaitAsync_WithCancellationToken() - { - var cts = new CancellationTokenSource(); - Task waitTask = _aare.WaitOneAsync(cts.Token); - Assert.IsFalse(waitTask.IsCompleted); - - // Cancel the request and ensure that it propagates to the task. - cts.Cancel(); - try - { - waitTask.GetAwaiter().GetResult(); - Assert.IsTrue(false, "Task was expected to transition to a canceled state."); - } - catch (OperationCanceledException) - { - } - - // Now set the event and verify that a future waiter gets the signal immediately. - _aare.Set(); - waitTask = _aare.WaitOneAsync(); - Assert.AreEqual(TaskStatus.WaitingForActivation, waitTask.Status); - } - - [TestMethod] - public void WaitAsync_WithCancellationToken_Precanceled() - { - // We construct our own pre-canceled token so that we can do - // a meaningful identity check later. - var tokenSource = new CancellationTokenSource(); - tokenSource.Cancel(); - var token = tokenSource.Token; - - // Verify that a pre-set signal is not reset by a canceled wait request. - _aare.Set(); - try - { - _aare.WaitOneAsync(token).GetAwaiter().GetResult(); - Assert.IsTrue(false, "Task was expected to transition to a canceled state."); - } - catch (OperationCanceledException ex) - { - Assert.AreEqual(token, ex.CancellationToken); - } - - // Verify that the signal was not acquired. - Task waitTask = _aare.WaitOneAsync(); - Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); - } - - [TestMethod] - public async Task WaitAsync_WithTimeout() - { - Task waitTask = _aare.WaitOneAsync(TimeSpan.FromMilliseconds(500)); - Assert.IsFalse(waitTask.IsCompleted); - - // Cancel the request and ensure that it propagates to the task. - await Task.Delay(1000).ConfigureAwait(false); - try - { - waitTask.GetAwaiter().GetResult(); - Assert.IsTrue(false, "Task was expected to transition to a timeout state."); - } - catch (TimeoutException) - { - Assert.IsTrue(true); - } - - // Now set the event and verify that a future waiter gets the signal immediately. - _aare.Set(); - waitTask = _aare.WaitOneAsync(TimeSpan.FromMilliseconds(500)); - Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); - } - - [TestMethod] - public void WaitAsync_Canceled_DoesNotInlineContinuations() - { - var cts = new CancellationTokenSource(); - var task = _aare.WaitOneAsync(cts.Token); - - var completingActionFinished = new ManualResetEventSlim(); - var continuation = task.ContinueWith( - _ => Assert.IsTrue(completingActionFinished.Wait(500)), - CancellationToken.None, - TaskContinuationOptions.None, - TaskScheduler.Default); - - cts.Cancel(); - completingActionFinished.Set(); - - // Rethrow the exception if it turned out it deadlocked. - continuation.GetAwaiter().GetResult(); - } - - [TestMethod] - public async Task AsyncAutoResetEvent() - { - var aare = new AsyncAutoResetEvent(); - - var globalI = 0; -#pragma warning disable 4014 - Task.Run(async () => -#pragma warning restore 4014 - { - await aare.WaitOneAsync(CancellationToken.None); - globalI += 1; - }); - -#pragma warning disable 4014 - Task.Run(async () => -#pragma warning restore 4014 - { - await aare.WaitOneAsync(CancellationToken.None); - globalI += 2; - }); - - await Task.Delay(500); - aare.Set(); - await Task.Delay(500); - aare.Set(); - await Task.Delay(100); - - Assert.AreEqual(3, globalI); - } - } -} \ No newline at end of file