From 27a61819d3e8272f5dabee7c065d8c3b4804efec Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sat, 4 Jan 2020 11:52:43 +0100 Subject: [PATCH 1/4] removed unused code --- .../MQTTnet/Internal/AsyncAutoResetEvent.cs | 131 ---------- .../AsyncAutoResentEvent_Tests.cs | 237 ------------------ 2 files changed, 368 deletions(-) delete mode 100644 Source/MQTTnet/Internal/AsyncAutoResetEvent.cs delete mode 100644 Tests/MQTTnet.Core.Tests/AsyncAutoResentEvent_Tests.cs diff --git a/Source/MQTTnet/Internal/AsyncAutoResetEvent.cs b/Source/MQTTnet/Internal/AsyncAutoResetEvent.cs deleted file mode 100644 index cd62f07..0000000 --- a/Source/MQTTnet/Internal/AsyncAutoResetEvent.cs +++ /dev/null @@ -1,131 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -namespace MQTTnet.Internal -{ - // Inspired from Stephen Toub (https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-2-asyncautoresetevent/) and Chris Gillum (https://stackoverflow.com/a/43012490) - public class AsyncAutoResetEvent - { - private readonly LinkedList> _waiters = new LinkedList>(); - - private bool _isSignaled; - - public AsyncAutoResetEvent() - : this(false) - { - } - - public AsyncAutoResetEvent(bool signaled) - { - _isSignaled = signaled; - } - - public int WaitersCount - { - get - { - lock (_waiters) - { - return _waiters.Count; - } - } - } - - public Task WaitOneAsync() - { - return WaitOneAsync(CancellationToken.None); - } - - public Task WaitOneAsync(TimeSpan timeout) - { - return WaitOneAsync(timeout, CancellationToken.None); - } - - public Task WaitOneAsync(CancellationToken cancellationToken) - { - return WaitOneAsync(Timeout.InfiniteTimeSpan, cancellationToken); - } - - public async Task WaitOneAsync(TimeSpan timeout, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - TaskCompletionSource tcs; - - lock (_waiters) - { - if (_isSignaled) - { - _isSignaled = false; - return true; - } - - if (timeout == TimeSpan.Zero) - { - return _isSignaled; - } - - tcs = new TaskCompletionSource(); - _waiters.AddLast(tcs); - } - - Task winner; - if (timeout == Timeout.InfiniteTimeSpan) - { - using (cancellationToken.Register(() => { tcs.TrySetCanceled(); })) - { - await tcs.Task.ConfigureAwait(false); - winner = tcs.Task; - } - } - else - { - winner = await Task.WhenAny(tcs.Task, Task.Delay(timeout, cancellationToken)).ConfigureAwait(false); - } - - var taskWasSignaled = winner == tcs.Task; - if (taskWasSignaled) - { - return true; - } - - // We timed-out; remove our reference to the task. - // This is an O(n) operation since waiters is a LinkedList. - lock (_waiters) - { - _waiters.Remove(tcs); - - if (winner.Status == TaskStatus.Canceled) - { - throw new OperationCanceledException(cancellationToken); - } - - throw new TimeoutException(); - } - } - - public void Set() - { - TaskCompletionSource toRelease = null; - - lock (_waiters) - { - if (_waiters.Count > 0) - { - // Signal the first task in the waiters list. - toRelease = _waiters.First.Value; - _waiters.RemoveFirst(); - } - else if (!_isSignaled) - { - // No tasks are pending - _isSignaled = true; - } - } - - toRelease?.TrySetResult(true); - } - } -} diff --git a/Tests/MQTTnet.Core.Tests/AsyncAutoResentEvent_Tests.cs b/Tests/MQTTnet.Core.Tests/AsyncAutoResentEvent_Tests.cs deleted file mode 100644 index d72712d..0000000 --- a/Tests/MQTTnet.Core.Tests/AsyncAutoResentEvent_Tests.cs +++ /dev/null @@ -1,237 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using MQTTnet.Internal; - -namespace MQTTnet.Tests -{ - [TestClass] - // Inspired from the vs-threading tests (https://github.com/Microsoft/vs-threading/blob/master/src/Microsoft.VisualStudio.Threading.Tests/AsyncAutoResetEventTests.cs) - public class AsyncAutoResetEvent_Tests - { - private readonly AsyncAutoResetEvent _aare; - - public AsyncAutoResetEvent_Tests() - { - _aare = new AsyncAutoResetEvent(); - } - - [TestMethod] - public async Task Cleanup_Waiters() - { - var @lock = new AsyncAutoResetEvent(); - - var waitOnePassed = false; - -#pragma warning disable 4014 - Task.Run(async () => -#pragma warning restore 4014 - { - await @lock.WaitOneAsync(TimeSpan.FromSeconds(2)); - waitOnePassed = true; - }); - - await Task.Delay(500); - - Assert.AreEqual(1, @lock.WaitersCount); - - @lock.Set(); - - await Task.Delay(1000); - - Assert.IsTrue(waitOnePassed); - Assert.AreEqual(0, @lock.WaitersCount); - } - - [TestMethod] - public async Task SingleThreadedPulse() - { - for (int i = 0; i < 5; i++) - { - var t = _aare.WaitOneAsync(); - Assert.IsFalse(t.IsCompleted); - _aare.Set(); - await t; - Assert.IsTrue(t.IsCompleted); - } - } - - [TestMethod] - public async Task MultipleSetOnlySignalsOnce() - { - _aare.Set(); - _aare.Set(); - await _aare.WaitOneAsync(); - var t = _aare.WaitOneAsync(); - Assert.IsFalse(t.IsCompleted); - await Task.Delay(500); - Assert.IsFalse(t.IsCompleted); - _aare.Set(); - await t; - Assert.IsTrue(t.IsCompleted); - } - - [TestMethod] - public async Task OrderPreservingQueue() - { - var waiters = new Task[5]; - for (int i = 0; i < waiters.Length; i++) - { - waiters[i] = _aare.WaitOneAsync(); - } - - for (int i = 0; i < waiters.Length; i++) - { - _aare.Set(); - await waiters[i].ConfigureAwait(false); - } - } - - // This test does not work in appveyor but on local machine it does!? - /////// - /////// Verifies that inlining continuations do not have to complete execution before Set() returns. - /////// - ////[TestMethod] - ////public async Task SetReturnsBeforeInlinedContinuations() - ////{ - //// var setReturned = new ManualResetEventSlim(); - //// var inlinedContinuation = _aare.WaitOneAsync() - //// .ContinueWith(delegate - //// { - //// // Arrange to synchronously block the continuation until Set() has returned, - //// // which would deadlock if Set does not return until inlined continuations complete. - //// Assert.IsTrue(setReturned.Wait(500)); - //// }); - //// await Task.Delay(100); - //// _aare.Set(); - //// setReturned.Set(); - //// Assert.IsTrue(inlinedContinuation.Wait(500)); - ////} - - [TestMethod] - public void WaitAsync_WithCancellationToken() - { - var cts = new CancellationTokenSource(); - Task waitTask = _aare.WaitOneAsync(cts.Token); - Assert.IsFalse(waitTask.IsCompleted); - - // Cancel the request and ensure that it propagates to the task. - cts.Cancel(); - try - { - waitTask.GetAwaiter().GetResult(); - Assert.IsTrue(false, "Task was expected to transition to a canceled state."); - } - catch (OperationCanceledException) - { - } - - // Now set the event and verify that a future waiter gets the signal immediately. - _aare.Set(); - waitTask = _aare.WaitOneAsync(); - Assert.AreEqual(TaskStatus.WaitingForActivation, waitTask.Status); - } - - [TestMethod] - public void WaitAsync_WithCancellationToken_Precanceled() - { - // We construct our own pre-canceled token so that we can do - // a meaningful identity check later. - var tokenSource = new CancellationTokenSource(); - tokenSource.Cancel(); - var token = tokenSource.Token; - - // Verify that a pre-set signal is not reset by a canceled wait request. - _aare.Set(); - try - { - _aare.WaitOneAsync(token).GetAwaiter().GetResult(); - Assert.IsTrue(false, "Task was expected to transition to a canceled state."); - } - catch (OperationCanceledException ex) - { - Assert.AreEqual(token, ex.CancellationToken); - } - - // Verify that the signal was not acquired. - Task waitTask = _aare.WaitOneAsync(); - Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); - } - - [TestMethod] - public async Task WaitAsync_WithTimeout() - { - Task waitTask = _aare.WaitOneAsync(TimeSpan.FromMilliseconds(500)); - Assert.IsFalse(waitTask.IsCompleted); - - // Cancel the request and ensure that it propagates to the task. - await Task.Delay(1000).ConfigureAwait(false); - try - { - waitTask.GetAwaiter().GetResult(); - Assert.IsTrue(false, "Task was expected to transition to a timeout state."); - } - catch (TimeoutException) - { - Assert.IsTrue(true); - } - - // Now set the event and verify that a future waiter gets the signal immediately. - _aare.Set(); - waitTask = _aare.WaitOneAsync(TimeSpan.FromMilliseconds(500)); - Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); - } - - [TestMethod] - public void WaitAsync_Canceled_DoesNotInlineContinuations() - { - var cts = new CancellationTokenSource(); - var task = _aare.WaitOneAsync(cts.Token); - - var completingActionFinished = new ManualResetEventSlim(); - var continuation = task.ContinueWith( - _ => Assert.IsTrue(completingActionFinished.Wait(500)), - CancellationToken.None, - TaskContinuationOptions.None, - TaskScheduler.Default); - - cts.Cancel(); - completingActionFinished.Set(); - - // Rethrow the exception if it turned out it deadlocked. - continuation.GetAwaiter().GetResult(); - } - - [TestMethod] - public async Task AsyncAutoResetEvent() - { - var aare = new AsyncAutoResetEvent(); - - var globalI = 0; -#pragma warning disable 4014 - Task.Run(async () => -#pragma warning restore 4014 - { - await aare.WaitOneAsync(CancellationToken.None); - globalI += 1; - }); - -#pragma warning disable 4014 - Task.Run(async () => -#pragma warning restore 4014 - { - await aare.WaitOneAsync(CancellationToken.None); - globalI += 2; - }); - - await Task.Delay(500); - aare.Set(); - await Task.Delay(500); - aare.Set(); - await Task.Delay(100); - - Assert.AreEqual(3, globalI); - } - } -} \ No newline at end of file From 138cd16c8dbeb4b5a3163bb44fadff5ef3240336 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sat, 4 Jan 2020 11:53:15 +0100 Subject: [PATCH 2/4] fixed compilation --- .../Implementations/PlatformAbstractionLayer.cs | 12 ++++++++++++ Source/MQTTnet/Server/MqttRetainedMessagesManager.cs | 3 ++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs b/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs index a940eac..ee9057a 100644 --- a/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs +++ b/Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs @@ -88,5 +88,17 @@ namespace MQTTnet.Implementations #endif } + public static Task CompletedTask + { + get + { +#if NET452 + return Task.FromResult(0); +#else + return Task.CompletedTask; +#endif + } + } + } } diff --git a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs b/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs index c4e2f96..f4ebe48 100644 --- a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs +++ b/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using MQTTnet.Diagnostics; +using MQTTnet.Implementations; using MQTTnet.Internal; namespace MQTTnet.Server @@ -21,7 +22,7 @@ namespace MQTTnet.Server if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(MqttRetainedMessagesManager)); _options = options ?? throw new ArgumentNullException(nameof(options)); - return Task.CompletedTask; + return PlatformAbstractionLayer.CompletedTask; } public async Task LoadMessagesAsync() From a152066ed34bd5d8114cbbecf82d183316d84bfd Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sat, 4 Jan 2020 11:54:41 +0100 Subject: [PATCH 3/4] fixed managed client so it does not send disconnect packet when disposed --- Source/MQTTnet/Internal/Disposable.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet/Internal/Disposable.cs b/Source/MQTTnet/Internal/Disposable.cs index f8a72b5..2ce3423 100644 --- a/Source/MQTTnet/Internal/Disposable.cs +++ b/Source/MQTTnet/Internal/Disposable.cs @@ -44,12 +44,13 @@ namespace MQTTnet.Internal { return; } + + _isDisposed = true; + // Do not change this code. Put cleanup code in Dispose(bool disposing) above. Dispose(true); // TODO: uncomment the following line if the finalizer is overridden above. // GC.SuppressFinalize(this); - - _isDisposed = true; } #endregion } From 630400da53267e83ad1f6129137b23e3e300afc4 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sat, 4 Jan 2020 11:55:12 +0100 Subject: [PATCH 4/4] avoid task.run on newer platforms --- .../PacketDispatcher/MqttPacketAwaiter.cs | 36 ++++++++++++++++--- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs b/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs index 8f906f6..b172290 100644 --- a/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs +++ b/Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs @@ -9,7 +9,7 @@ namespace MQTTnet.PacketDispatcher { public sealed class MqttPacketAwaiter : Disposable, IMqttPacketAwaiter where TPacket : MqttBasePacket { - private readonly TaskCompletionSource _taskCompletionSource = new TaskCompletionSource(); + private readonly TaskCompletionSource _taskCompletionSource; private readonly ushort? _packetIdentifier; private readonly MqttPacketDispatcher _owningPacketDispatcher; @@ -17,13 +17,18 @@ namespace MQTTnet.PacketDispatcher { _packetIdentifier = packetIdentifier; _owningPacketDispatcher = owningPacketDispatcher ?? throw new ArgumentNullException(nameof(owningPacketDispatcher)); +#if NET452 + _taskCompletionSource = new TaskCompletionSource(); +#else + _taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); +#endif } public async Task WaitOneAsync(TimeSpan timeout) { using (var timeoutToken = new CancellationTokenSource(timeout)) { - timeoutToken.Token.Register(() => _taskCompletionSource.TrySetException(new MqttCommunicationTimedOutException())); + timeoutToken.Token.Register(() => Fail(new MqttCommunicationTimedOutException())); var packet = await _taskCompletionSource.Task.ConfigureAwait(false); return (TPacket)packet; @@ -33,24 +38,47 @@ namespace MQTTnet.PacketDispatcher public void Complete(MqttBasePacket packet) { if (packet == null) throw new ArgumentNullException(nameof(packet)); - + + +#if NET452 // To prevent deadlocks it is required to call the _TrySetResult_ method // from a new thread because the awaiting code will not(!) be executed in // a new thread automatically (due to await). Furthermore _this_ thread will // do it. But _this_ thread is also reading incoming packets -> deadlock. + // NET452 does not support RunContinuationsAsynchronously Task.Run(() => _taskCompletionSource.TrySetResult(packet)); +#else + _taskCompletionSource.TrySetResult(packet); +#endif } public void Fail(Exception exception) { if (exception == null) throw new ArgumentNullException(nameof(exception)); - +#if NET452 + // To prevent deadlocks it is required to call the _TrySetResult_ method + // from a new thread because the awaiting code will not(!) be executed in + // a new thread automatically (due to await). Furthermore _this_ thread will + // do it. But _this_ thread is also reading incoming packets -> deadlock. + // NET452 does not support RunContinuationsAsynchronously Task.Run(() => _taskCompletionSource.TrySetException(exception)); +#else + _taskCompletionSource.TrySetException(exception); +#endif } public void Cancel() { +#if NET452 + // To prevent deadlocks it is required to call the _TrySetResult_ method + // from a new thread because the awaiting code will not(!) be executed in + // a new thread automatically (due to await). Furthermore _this_ thread will + // do it. But _this_ thread is also reading incoming packets -> deadlock. + // NET452 does not support RunContinuationsAsynchronously Task.Run(() => _taskCompletionSource.TrySetCanceled()); +#else + _taskCompletionSource.TrySetCanceled(); +#endif } protected override void Dispose(bool disposing)