diff --git a/Frameworks/MQTTnet.NetStandard/Internal/AsyncAutoResetEvent.cs b/Frameworks/MQTTnet.NetStandard/Internal/AsyncAutoResetEvent.cs index a8fa852..f20733d 100644 --- a/Frameworks/MQTTnet.NetStandard/Internal/AsyncAutoResetEvent.cs +++ b/Frameworks/MQTTnet.NetStandard/Internal/AsyncAutoResetEvent.cs @@ -8,31 +8,30 @@ namespace MQTTnet.Internal // Inspired from Stephen Toub (https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-2-asyncautoresetevent/) and Chris Gillum (https://stackoverflow.com/a/43012490) public class AsyncAutoResetEvent { - private readonly LinkedList> waiters = new LinkedList>(); - - private bool isSignaled; + private readonly LinkedList> _waiters = new LinkedList>(); + private bool _isSignaled; public AsyncAutoResetEvent() : this(false) { } public AsyncAutoResetEvent(bool signaled) { - this.isSignaled = signaled; + _isSignaled = signaled; } public Task WaitOneAsync() { - return this.WaitOneAsync(CancellationToken.None); + return WaitOneAsync(CancellationToken.None); } public Task WaitOneAsync(TimeSpan timeout) { - return this.WaitOneAsync(timeout, CancellationToken.None); + return WaitOneAsync(timeout, CancellationToken.None); } public Task WaitOneAsync(CancellationToken cancellationToken) { - return this.WaitOneAsync(Timeout.InfiniteTimeSpan, cancellationToken); + return WaitOneAsync(Timeout.InfiniteTimeSpan, cancellationToken); } public async Task WaitOneAsync(TimeSpan timeout, CancellationToken cancellationToken) @@ -41,21 +40,21 @@ namespace MQTTnet.Internal TaskCompletionSource tcs; - lock (this.waiters) + lock (_waiters) { - if (this.isSignaled) + if (_isSignaled) { - this.isSignaled = false; + _isSignaled = false; return true; } else if (timeout == TimeSpan.Zero) { - return this.isSignaled; + return _isSignaled; } else { tcs = new TaskCompletionSource(); - this.waiters.AddLast(tcs); + _waiters.AddLast(tcs); } } @@ -69,9 +68,9 @@ namespace MQTTnet.Internal { // We timed-out; remove our reference to the task. // This is an O(n) operation since waiters is a LinkedList. - lock (this.waiters) + lock (_waiters) { - this.waiters.Remove(tcs); + _waiters.Remove(tcs); if (winner.Status == TaskStatus.Canceled) { throw new OperationCanceledException(cancellationToken); @@ -88,25 +87,22 @@ namespace MQTTnet.Internal { TaskCompletionSource toRelease = null; - lock (this.waiters) + lock (_waiters) { - if (this.waiters.Count > 0) + if (_waiters.Count > 0) { // Signal the first task in the waiters list. - toRelease = this.waiters.First.Value; - this.waiters.RemoveFirst(); + toRelease = _waiters.First.Value; + _waiters.RemoveFirst(); } - else if (!this.isSignaled) + else if (!_isSignaled) { // No tasks are pending - this.isSignaled = true; + _isSignaled = true; } } - if (toRelease != null) - { - toRelease.SetResult(true); - } + toRelease?.SetResult(true); } } } diff --git a/Frameworks/MQTTnet.NetStandard/Internal/AsyncLock.cs b/Frameworks/MQTTnet.NetStandard/Internal/AsyncLock.cs index 5e60a6a..1de1b1a 100644 --- a/Frameworks/MQTTnet.NetStandard/Internal/AsyncLock.cs +++ b/Frameworks/MQTTnet.NetStandard/Internal/AsyncLock.cs @@ -7,34 +7,42 @@ namespace MQTTnet.Internal // From Stephen Toub (https://blogs.msdn.microsoft.com/pfxteam/2012/02/12/building-async-coordination-primitives-part-6-asynclock/) public sealed class AsyncLock : IDisposable { - private readonly SemaphoreSlim m_semaphore = new SemaphoreSlim(1, 1); - private readonly Task m_releaser; + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); + private readonly Task _releaser; public AsyncLock() { - m_releaser = Task.FromResult((IDisposable)new Releaser(this)); + _releaser = Task.FromResult((IDisposable)new Releaser(this)); } public Task LockAsync(CancellationToken cancellationToken) { - var wait = m_semaphore.WaitAsync(cancellationToken); + Task wait = _semaphore.WaitAsync(cancellationToken); return wait.IsCompleted ? - m_releaser : + _releaser : wait.ContinueWith((_, state) => (IDisposable)state, - m_releaser.Result, cancellationToken, + _releaser.Result, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } public void Dispose() { - this.m_semaphore?.Dispose(); + _semaphore?.Dispose(); } private sealed class Releaser : IDisposable { - private readonly AsyncLock m_toRelease; - internal Releaser(AsyncLock toRelease) { m_toRelease = toRelease; } - public void Dispose() { m_toRelease.m_semaphore.Release(); } + private readonly AsyncLock _toRelease; + + internal Releaser(AsyncLock toRelease) + { + _toRelease = toRelease; + } + + public void Dispose() + { + _toRelease._semaphore.Release(); + } } } } diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs index e47f3f3..9281a97 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs @@ -119,7 +119,7 @@ namespace MQTTnet.Server public async Task StopAsync() { - using (var releaser = await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) + using (await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { foreach (var session in _sessions) { @@ -132,7 +132,7 @@ namespace MQTTnet.Server public async Task> GetConnectedClientsAsync() { - using (var releaser = await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) + using (await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { return _sessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient { @@ -155,7 +155,7 @@ namespace MQTTnet.Server if (clientId == null) throw new ArgumentNullException(nameof(clientId)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - using (var releaser = await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) + using (await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { if (!_sessions.TryGetValue(clientId, out var session)) { @@ -171,7 +171,7 @@ namespace MQTTnet.Server if (clientId == null) throw new ArgumentNullException(nameof(clientId)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - using (var releaser = await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) + using (await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { if (!_sessions.TryGetValue(clientId, out var session)) { @@ -205,8 +205,7 @@ namespace MQTTnet.Server private async Task PrepareClientSessionAsync(MqttConnectPacket connectPacket) { - using (var releaser = await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) - + using (await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var clientSession); if (isSessionPresent) @@ -269,7 +268,7 @@ namespace MQTTnet.Server _logger.Error(exception, "Error while processing application message"); } - using (var releaser = await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) + using (await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { foreach (var clientSession in _sessions.Values) { diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs index e00ff44..89765b7 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs @@ -29,7 +29,7 @@ namespace MQTTnet.Server return; } - using (var releaser = await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) + using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { try { @@ -52,7 +52,7 @@ namespace MQTTnet.Server { if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); - using (var releaser = await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) + using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { try { @@ -69,7 +69,7 @@ namespace MQTTnet.Server { var retainedMessages = new List(); - using (var releaser = await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) + using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { foreach (var retainedMessage in _messages.Values) { diff --git a/Tests/MQTTnet.Core.Tests/AsyncAutoResentEventTests.cs b/Tests/MQTTnet.Core.Tests/AsyncAutoResentEventTests.cs index a4909b4..7087ff0 100644 --- a/Tests/MQTTnet.Core.Tests/AsyncAutoResentEventTests.cs +++ b/Tests/MQTTnet.Core.Tests/AsyncAutoResentEventTests.cs @@ -10,11 +10,11 @@ namespace MQTTnet.Core.Tests // Inspired from the vs-threading tests (https://github.com/Microsoft/vs-threading/blob/master/src/Microsoft.VisualStudio.Threading.Tests/AsyncAutoResetEventTests.cs) public class AsyncAutoResetEventTests { - private readonly AsyncAutoResetEvent evt; + private readonly AsyncAutoResetEvent _aare; public AsyncAutoResetEventTests() { - this.evt = new AsyncAutoResetEvent(); + _aare = new AsyncAutoResetEvent(); } [TestMethod] @@ -22,9 +22,9 @@ namespace MQTTnet.Core.Tests { for (int i = 0; i < 5; i++) { - var t = this.evt.WaitOneAsync(); + var t = _aare.WaitOneAsync(); Assert.IsFalse(t.IsCompleted); - this.evt.Set(); + _aare.Set(); await t; Assert.IsTrue(t.IsCompleted); } @@ -33,14 +33,14 @@ namespace MQTTnet.Core.Tests [TestMethod] public async Task MultipleSetOnlySignalsOnce() { - this.evt.Set(); - this.evt.Set(); - await this.evt.WaitOneAsync(); - var t = this.evt.WaitOneAsync(); + _aare.Set(); + _aare.Set(); + await _aare.WaitOneAsync(); + var t = _aare.WaitOneAsync(); Assert.IsFalse(t.IsCompleted); await Task.Delay(500); Assert.IsFalse(t.IsCompleted); - this.evt.Set(); + _aare.Set(); await t; Assert.IsTrue(t.IsCompleted); } @@ -51,12 +51,12 @@ namespace MQTTnet.Core.Tests var waiters = new Task[5]; for (int i = 0; i < waiters.Length; i++) { - waiters[i] = this.evt.WaitOneAsync(); + waiters[i] = _aare.WaitOneAsync(); } for (int i = 0; i < waiters.Length; i++) { - this.evt.Set(); + _aare.Set(); await waiters[i].ConfigureAwait(false); } } @@ -68,7 +68,7 @@ namespace MQTTnet.Core.Tests public async Task SetReturnsBeforeInlinedContinuations() { var setReturned = new ManualResetEventSlim(); - var inlinedContinuation = this.evt.WaitOneAsync() + var inlinedContinuation = _aare.WaitOneAsync() .ContinueWith(delegate { // Arrange to synchronously block the continuation until Set() has returned, @@ -76,7 +76,7 @@ namespace MQTTnet.Core.Tests Assert.IsTrue(setReturned.Wait(500)); }); await Task.Delay(100); - this.evt.Set(); + _aare.Set(); setReturned.Set(); Assert.IsTrue(inlinedContinuation.Wait(500)); } @@ -85,7 +85,7 @@ namespace MQTTnet.Core.Tests public void WaitAsync_WithCancellationToken() { var cts = new CancellationTokenSource(); - Task waitTask = this.evt.WaitOneAsync(cts.Token); + Task waitTask = _aare.WaitOneAsync(cts.Token); Assert.IsFalse(waitTask.IsCompleted); // Cancel the request and ensure that it propagates to the task. @@ -95,14 +95,14 @@ namespace MQTTnet.Core.Tests waitTask.GetAwaiter().GetResult(); Assert.IsTrue(false, "Task was expected to transition to a canceled state."); } - catch (System.OperationCanceledException ex) + catch (OperationCanceledException ex) { Assert.AreEqual(cts.Token, ex.CancellationToken); } // Now set the event and verify that a future waiter gets the signal immediately. - this.evt.Set(); - waitTask = this.evt.WaitOneAsync(); + _aare.Set(); + waitTask = _aare.WaitOneAsync(); Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); } @@ -116,10 +116,10 @@ namespace MQTTnet.Core.Tests var token = tokenSource.Token; // Verify that a pre-set signal is not reset by a canceled wait request. - this.evt.Set(); + _aare.Set(); try { - this.evt.WaitOneAsync(token).GetAwaiter().GetResult(); + _aare.WaitOneAsync(token).GetAwaiter().GetResult(); Assert.IsTrue(false, "Task was expected to transition to a canceled state."); } catch (OperationCanceledException ex) @@ -128,14 +128,14 @@ namespace MQTTnet.Core.Tests } // Verify that the signal was not acquired. - Task waitTask = this.evt.WaitOneAsync(); + Task waitTask = _aare.WaitOneAsync(); Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); } [TestMethod] public async Task WaitAsync_WithTimeout() { - Task waitTask = this.evt.WaitOneAsync(TimeSpan.FromMilliseconds(500)); + Task waitTask = _aare.WaitOneAsync(TimeSpan.FromMilliseconds(500)); Assert.IsFalse(waitTask.IsCompleted); // Cancel the request and ensure that it propagates to the task. @@ -145,14 +145,14 @@ namespace MQTTnet.Core.Tests waitTask.GetAwaiter().GetResult(); Assert.IsTrue(false, "Task was expected to transition to a timeout state."); } - catch (System.TimeoutException) + catch (TimeoutException) { Assert.IsTrue(true); } // Now set the event and verify that a future waiter gets the signal immediately. - this.evt.Set(); - waitTask = this.evt.WaitOneAsync(TimeSpan.FromMilliseconds(500)); + _aare.Set(); + waitTask = _aare.WaitOneAsync(TimeSpan.FromMilliseconds(500)); Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); } @@ -160,7 +160,7 @@ namespace MQTTnet.Core.Tests public void WaitAsync_Canceled_DoesNotInlineContinuations() { var cts = new CancellationTokenSource(); - var task = this.evt.WaitOneAsync(cts.Token); + var task = _aare.WaitOneAsync(cts.Token); var completingActionFinished = new ManualResetEventSlim(); var continuation = task.ContinueWith(