@@ -8,31 +8,30 @@ namespace MQTTnet.Internal | |||||
// Inspired from Stephen Toub (https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-2-asyncautoresetevent/) and Chris Gillum (https://stackoverflow.com/a/43012490) | // Inspired from Stephen Toub (https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-2-asyncautoresetevent/) and Chris Gillum (https://stackoverflow.com/a/43012490) | ||||
public class AsyncAutoResetEvent | public class AsyncAutoResetEvent | ||||
{ | { | ||||
private readonly LinkedList<TaskCompletionSource<bool>> waiters = new LinkedList<TaskCompletionSource<bool>>(); | |||||
private bool isSignaled; | |||||
private readonly LinkedList<TaskCompletionSource<bool>> _waiters = new LinkedList<TaskCompletionSource<bool>>(); | |||||
private bool _isSignaled; | |||||
public AsyncAutoResetEvent() : this(false) | public AsyncAutoResetEvent() : this(false) | ||||
{ } | { } | ||||
public AsyncAutoResetEvent(bool signaled) | public AsyncAutoResetEvent(bool signaled) | ||||
{ | { | ||||
this.isSignaled = signaled; | |||||
_isSignaled = signaled; | |||||
} | } | ||||
public Task<bool> WaitOneAsync() | public Task<bool> WaitOneAsync() | ||||
{ | { | ||||
return this.WaitOneAsync(CancellationToken.None); | |||||
return WaitOneAsync(CancellationToken.None); | |||||
} | } | ||||
public Task<bool> WaitOneAsync(TimeSpan timeout) | public Task<bool> WaitOneAsync(TimeSpan timeout) | ||||
{ | { | ||||
return this.WaitOneAsync(timeout, CancellationToken.None); | |||||
return WaitOneAsync(timeout, CancellationToken.None); | |||||
} | } | ||||
public Task<bool> WaitOneAsync(CancellationToken cancellationToken) | public Task<bool> WaitOneAsync(CancellationToken cancellationToken) | ||||
{ | { | ||||
return this.WaitOneAsync(Timeout.InfiniteTimeSpan, cancellationToken); | |||||
return WaitOneAsync(Timeout.InfiniteTimeSpan, cancellationToken); | |||||
} | } | ||||
public async Task<bool> WaitOneAsync(TimeSpan timeout, CancellationToken cancellationToken) | public async Task<bool> WaitOneAsync(TimeSpan timeout, CancellationToken cancellationToken) | ||||
@@ -41,21 +40,21 @@ namespace MQTTnet.Internal | |||||
TaskCompletionSource<bool> tcs; | TaskCompletionSource<bool> tcs; | ||||
lock (this.waiters) | |||||
lock (_waiters) | |||||
{ | { | ||||
if (this.isSignaled) | |||||
if (_isSignaled) | |||||
{ | { | ||||
this.isSignaled = false; | |||||
_isSignaled = false; | |||||
return true; | return true; | ||||
} | } | ||||
else if (timeout == TimeSpan.Zero) | else if (timeout == TimeSpan.Zero) | ||||
{ | { | ||||
return this.isSignaled; | |||||
return _isSignaled; | |||||
} | } | ||||
else | else | ||||
{ | { | ||||
tcs = new TaskCompletionSource<bool>(); | tcs = new TaskCompletionSource<bool>(); | ||||
this.waiters.AddLast(tcs); | |||||
_waiters.AddLast(tcs); | |||||
} | } | ||||
} | } | ||||
@@ -69,9 +68,9 @@ namespace MQTTnet.Internal | |||||
{ | { | ||||
// We timed-out; remove our reference to the task. | // We timed-out; remove our reference to the task. | ||||
// This is an O(n) operation since waiters is a LinkedList<T>. | // This is an O(n) operation since waiters is a LinkedList<T>. | ||||
lock (this.waiters) | |||||
lock (_waiters) | |||||
{ | { | ||||
this.waiters.Remove(tcs); | |||||
_waiters.Remove(tcs); | |||||
if (winner.Status == TaskStatus.Canceled) | if (winner.Status == TaskStatus.Canceled) | ||||
{ | { | ||||
throw new OperationCanceledException(cancellationToken); | throw new OperationCanceledException(cancellationToken); | ||||
@@ -88,25 +87,22 @@ namespace MQTTnet.Internal | |||||
{ | { | ||||
TaskCompletionSource<bool> toRelease = null; | TaskCompletionSource<bool> toRelease = null; | ||||
lock (this.waiters) | |||||
lock (_waiters) | |||||
{ | { | ||||
if (this.waiters.Count > 0) | |||||
if (_waiters.Count > 0) | |||||
{ | { | ||||
// Signal the first task in the waiters list. | // Signal the first task in the waiters list. | ||||
toRelease = this.waiters.First.Value; | |||||
this.waiters.RemoveFirst(); | |||||
toRelease = _waiters.First.Value; | |||||
_waiters.RemoveFirst(); | |||||
} | } | ||||
else if (!this.isSignaled) | |||||
else if (!_isSignaled) | |||||
{ | { | ||||
// No tasks are pending | // No tasks are pending | ||||
this.isSignaled = true; | |||||
_isSignaled = true; | |||||
} | } | ||||
} | } | ||||
if (toRelease != null) | |||||
{ | |||||
toRelease.SetResult(true); | |||||
} | |||||
toRelease?.SetResult(true); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -7,34 +7,42 @@ namespace MQTTnet.Internal | |||||
// From Stephen Toub (https://blogs.msdn.microsoft.com/pfxteam/2012/02/12/building-async-coordination-primitives-part-6-asynclock/) | // From Stephen Toub (https://blogs.msdn.microsoft.com/pfxteam/2012/02/12/building-async-coordination-primitives-part-6-asynclock/) | ||||
public sealed class AsyncLock : IDisposable | public sealed class AsyncLock : IDisposable | ||||
{ | { | ||||
private readonly SemaphoreSlim m_semaphore = new SemaphoreSlim(1, 1); | |||||
private readonly Task<IDisposable> m_releaser; | |||||
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); | |||||
private readonly Task<IDisposable> _releaser; | |||||
public AsyncLock() | public AsyncLock() | ||||
{ | { | ||||
m_releaser = Task.FromResult((IDisposable)new Releaser(this)); | |||||
_releaser = Task.FromResult((IDisposable)new Releaser(this)); | |||||
} | } | ||||
public Task<IDisposable> LockAsync(CancellationToken cancellationToken) | public Task<IDisposable> LockAsync(CancellationToken cancellationToken) | ||||
{ | { | ||||
var wait = m_semaphore.WaitAsync(cancellationToken); | |||||
Task wait = _semaphore.WaitAsync(cancellationToken); | |||||
return wait.IsCompleted ? | return wait.IsCompleted ? | ||||
m_releaser : | |||||
_releaser : | |||||
wait.ContinueWith((_, state) => (IDisposable)state, | wait.ContinueWith((_, state) => (IDisposable)state, | ||||
m_releaser.Result, cancellationToken, | |||||
_releaser.Result, cancellationToken, | |||||
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); | TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); | ||||
} | } | ||||
public void Dispose() | public void Dispose() | ||||
{ | { | ||||
this.m_semaphore?.Dispose(); | |||||
_semaphore?.Dispose(); | |||||
} | } | ||||
private sealed class Releaser : IDisposable | private sealed class Releaser : IDisposable | ||||
{ | { | ||||
private readonly AsyncLock m_toRelease; | |||||
internal Releaser(AsyncLock toRelease) { m_toRelease = toRelease; } | |||||
public void Dispose() { m_toRelease.m_semaphore.Release(); } | |||||
private readonly AsyncLock _toRelease; | |||||
internal Releaser(AsyncLock toRelease) | |||||
{ | |||||
_toRelease = toRelease; | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
_toRelease._semaphore.Release(); | |||||
} | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -119,7 +119,7 @@ namespace MQTTnet.Server | |||||
public async Task StopAsync() | public async Task StopAsync() | ||||
{ | { | ||||
using (var releaser = await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
using (await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
{ | { | ||||
foreach (var session in _sessions) | foreach (var session in _sessions) | ||||
{ | { | ||||
@@ -132,7 +132,7 @@ namespace MQTTnet.Server | |||||
public async Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync() | public async Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync() | ||||
{ | { | ||||
using (var releaser = await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
using (await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
{ | { | ||||
return _sessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient | return _sessions.Where(s => s.Value.IsConnected).Select(s => new ConnectedMqttClient | ||||
{ | { | ||||
@@ -155,7 +155,7 @@ namespace MQTTnet.Server | |||||
if (clientId == null) throw new ArgumentNullException(nameof(clientId)); | if (clientId == null) throw new ArgumentNullException(nameof(clientId)); | ||||
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); | if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); | ||||
using (var releaser = await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
using (await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
{ | { | ||||
if (!_sessions.TryGetValue(clientId, out var session)) | if (!_sessions.TryGetValue(clientId, out var session)) | ||||
{ | { | ||||
@@ -171,7 +171,7 @@ namespace MQTTnet.Server | |||||
if (clientId == null) throw new ArgumentNullException(nameof(clientId)); | if (clientId == null) throw new ArgumentNullException(nameof(clientId)); | ||||
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); | if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); | ||||
using (var releaser = await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
using (await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
{ | { | ||||
if (!_sessions.TryGetValue(clientId, out var session)) | if (!_sessions.TryGetValue(clientId, out var session)) | ||||
{ | { | ||||
@@ -205,8 +205,7 @@ namespace MQTTnet.Server | |||||
private async Task<GetOrCreateClientSessionResult> PrepareClientSessionAsync(MqttConnectPacket connectPacket) | private async Task<GetOrCreateClientSessionResult> PrepareClientSessionAsync(MqttConnectPacket connectPacket) | ||||
{ | { | ||||
using (var releaser = await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
using (await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
{ | { | ||||
var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var clientSession); | var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var clientSession); | ||||
if (isSessionPresent) | if (isSessionPresent) | ||||
@@ -269,7 +268,7 @@ namespace MQTTnet.Server | |||||
_logger.Error(exception, "Error while processing application message"); | _logger.Error(exception, "Error while processing application message"); | ||||
} | } | ||||
using (var releaser = await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
using (await _sessionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
{ | { | ||||
foreach (var clientSession in _sessions.Values) | foreach (var clientSession in _sessions.Values) | ||||
{ | { | ||||
@@ -29,7 +29,7 @@ namespace MQTTnet.Server | |||||
return; | return; | ||||
} | } | ||||
using (var releaser = await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
@@ -52,7 +52,7 @@ namespace MQTTnet.Server | |||||
{ | { | ||||
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); | if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); | ||||
using (var releaser = await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
@@ -69,7 +69,7 @@ namespace MQTTnet.Server | |||||
{ | { | ||||
var retainedMessages = new List<MqttApplicationMessage>(); | var retainedMessages = new List<MqttApplicationMessage>(); | ||||
using (var releaser = await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
{ | { | ||||
foreach (var retainedMessage in _messages.Values) | foreach (var retainedMessage in _messages.Values) | ||||
{ | { | ||||
@@ -10,11 +10,11 @@ namespace MQTTnet.Core.Tests | |||||
// Inspired from the vs-threading tests (https://github.com/Microsoft/vs-threading/blob/master/src/Microsoft.VisualStudio.Threading.Tests/AsyncAutoResetEventTests.cs) | // Inspired from the vs-threading tests (https://github.com/Microsoft/vs-threading/blob/master/src/Microsoft.VisualStudio.Threading.Tests/AsyncAutoResetEventTests.cs) | ||||
public class AsyncAutoResetEventTests | public class AsyncAutoResetEventTests | ||||
{ | { | ||||
private readonly AsyncAutoResetEvent evt; | |||||
private readonly AsyncAutoResetEvent _aare; | |||||
public AsyncAutoResetEventTests() | public AsyncAutoResetEventTests() | ||||
{ | { | ||||
this.evt = new AsyncAutoResetEvent(); | |||||
_aare = new AsyncAutoResetEvent(); | |||||
} | } | ||||
[TestMethod] | [TestMethod] | ||||
@@ -22,9 +22,9 @@ namespace MQTTnet.Core.Tests | |||||
{ | { | ||||
for (int i = 0; i < 5; i++) | for (int i = 0; i < 5; i++) | ||||
{ | { | ||||
var t = this.evt.WaitOneAsync(); | |||||
var t = _aare.WaitOneAsync(); | |||||
Assert.IsFalse(t.IsCompleted); | Assert.IsFalse(t.IsCompleted); | ||||
this.evt.Set(); | |||||
_aare.Set(); | |||||
await t; | await t; | ||||
Assert.IsTrue(t.IsCompleted); | Assert.IsTrue(t.IsCompleted); | ||||
} | } | ||||
@@ -33,14 +33,14 @@ namespace MQTTnet.Core.Tests | |||||
[TestMethod] | [TestMethod] | ||||
public async Task MultipleSetOnlySignalsOnce() | public async Task MultipleSetOnlySignalsOnce() | ||||
{ | { | ||||
this.evt.Set(); | |||||
this.evt.Set(); | |||||
await this.evt.WaitOneAsync(); | |||||
var t = this.evt.WaitOneAsync(); | |||||
_aare.Set(); | |||||
_aare.Set(); | |||||
await _aare.WaitOneAsync(); | |||||
var t = _aare.WaitOneAsync(); | |||||
Assert.IsFalse(t.IsCompleted); | Assert.IsFalse(t.IsCompleted); | ||||
await Task.Delay(500); | await Task.Delay(500); | ||||
Assert.IsFalse(t.IsCompleted); | Assert.IsFalse(t.IsCompleted); | ||||
this.evt.Set(); | |||||
_aare.Set(); | |||||
await t; | await t; | ||||
Assert.IsTrue(t.IsCompleted); | Assert.IsTrue(t.IsCompleted); | ||||
} | } | ||||
@@ -51,12 +51,12 @@ namespace MQTTnet.Core.Tests | |||||
var waiters = new Task[5]; | var waiters = new Task[5]; | ||||
for (int i = 0; i < waiters.Length; i++) | for (int i = 0; i < waiters.Length; i++) | ||||
{ | { | ||||
waiters[i] = this.evt.WaitOneAsync(); | |||||
waiters[i] = _aare.WaitOneAsync(); | |||||
} | } | ||||
for (int i = 0; i < waiters.Length; i++) | for (int i = 0; i < waiters.Length; i++) | ||||
{ | { | ||||
this.evt.Set(); | |||||
_aare.Set(); | |||||
await waiters[i].ConfigureAwait(false); | await waiters[i].ConfigureAwait(false); | ||||
} | } | ||||
} | } | ||||
@@ -68,7 +68,7 @@ namespace MQTTnet.Core.Tests | |||||
public async Task SetReturnsBeforeInlinedContinuations() | public async Task SetReturnsBeforeInlinedContinuations() | ||||
{ | { | ||||
var setReturned = new ManualResetEventSlim(); | var setReturned = new ManualResetEventSlim(); | ||||
var inlinedContinuation = this.evt.WaitOneAsync() | |||||
var inlinedContinuation = _aare.WaitOneAsync() | |||||
.ContinueWith(delegate | .ContinueWith(delegate | ||||
{ | { | ||||
// Arrange to synchronously block the continuation until Set() has returned, | // Arrange to synchronously block the continuation until Set() has returned, | ||||
@@ -76,7 +76,7 @@ namespace MQTTnet.Core.Tests | |||||
Assert.IsTrue(setReturned.Wait(500)); | Assert.IsTrue(setReturned.Wait(500)); | ||||
}); | }); | ||||
await Task.Delay(100); | await Task.Delay(100); | ||||
this.evt.Set(); | |||||
_aare.Set(); | |||||
setReturned.Set(); | setReturned.Set(); | ||||
Assert.IsTrue(inlinedContinuation.Wait(500)); | Assert.IsTrue(inlinedContinuation.Wait(500)); | ||||
} | } | ||||
@@ -85,7 +85,7 @@ namespace MQTTnet.Core.Tests | |||||
public void WaitAsync_WithCancellationToken() | public void WaitAsync_WithCancellationToken() | ||||
{ | { | ||||
var cts = new CancellationTokenSource(); | var cts = new CancellationTokenSource(); | ||||
Task waitTask = this.evt.WaitOneAsync(cts.Token); | |||||
Task waitTask = _aare.WaitOneAsync(cts.Token); | |||||
Assert.IsFalse(waitTask.IsCompleted); | Assert.IsFalse(waitTask.IsCompleted); | ||||
// Cancel the request and ensure that it propagates to the task. | // Cancel the request and ensure that it propagates to the task. | ||||
@@ -95,14 +95,14 @@ namespace MQTTnet.Core.Tests | |||||
waitTask.GetAwaiter().GetResult(); | waitTask.GetAwaiter().GetResult(); | ||||
Assert.IsTrue(false, "Task was expected to transition to a canceled state."); | Assert.IsTrue(false, "Task was expected to transition to a canceled state."); | ||||
} | } | ||||
catch (System.OperationCanceledException ex) | |||||
catch (OperationCanceledException ex) | |||||
{ | { | ||||
Assert.AreEqual(cts.Token, ex.CancellationToken); | Assert.AreEqual(cts.Token, ex.CancellationToken); | ||||
} | } | ||||
// Now set the event and verify that a future waiter gets the signal immediately. | // Now set the event and verify that a future waiter gets the signal immediately. | ||||
this.evt.Set(); | |||||
waitTask = this.evt.WaitOneAsync(); | |||||
_aare.Set(); | |||||
waitTask = _aare.WaitOneAsync(); | |||||
Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); | Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); | ||||
} | } | ||||
@@ -116,10 +116,10 @@ namespace MQTTnet.Core.Tests | |||||
var token = tokenSource.Token; | var token = tokenSource.Token; | ||||
// Verify that a pre-set signal is not reset by a canceled wait request. | // Verify that a pre-set signal is not reset by a canceled wait request. | ||||
this.evt.Set(); | |||||
_aare.Set(); | |||||
try | try | ||||
{ | { | ||||
this.evt.WaitOneAsync(token).GetAwaiter().GetResult(); | |||||
_aare.WaitOneAsync(token).GetAwaiter().GetResult(); | |||||
Assert.IsTrue(false, "Task was expected to transition to a canceled state."); | Assert.IsTrue(false, "Task was expected to transition to a canceled state."); | ||||
} | } | ||||
catch (OperationCanceledException ex) | catch (OperationCanceledException ex) | ||||
@@ -128,14 +128,14 @@ namespace MQTTnet.Core.Tests | |||||
} | } | ||||
// Verify that the signal was not acquired. | // Verify that the signal was not acquired. | ||||
Task waitTask = this.evt.WaitOneAsync(); | |||||
Task waitTask = _aare.WaitOneAsync(); | |||||
Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); | Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); | ||||
} | } | ||||
[TestMethod] | [TestMethod] | ||||
public async Task WaitAsync_WithTimeout() | public async Task WaitAsync_WithTimeout() | ||||
{ | { | ||||
Task waitTask = this.evt.WaitOneAsync(TimeSpan.FromMilliseconds(500)); | |||||
Task waitTask = _aare.WaitOneAsync(TimeSpan.FromMilliseconds(500)); | |||||
Assert.IsFalse(waitTask.IsCompleted); | Assert.IsFalse(waitTask.IsCompleted); | ||||
// Cancel the request and ensure that it propagates to the task. | // Cancel the request and ensure that it propagates to the task. | ||||
@@ -145,14 +145,14 @@ namespace MQTTnet.Core.Tests | |||||
waitTask.GetAwaiter().GetResult(); | waitTask.GetAwaiter().GetResult(); | ||||
Assert.IsTrue(false, "Task was expected to transition to a timeout state."); | Assert.IsTrue(false, "Task was expected to transition to a timeout state."); | ||||
} | } | ||||
catch (System.TimeoutException) | |||||
catch (TimeoutException) | |||||
{ | { | ||||
Assert.IsTrue(true); | Assert.IsTrue(true); | ||||
} | } | ||||
// Now set the event and verify that a future waiter gets the signal immediately. | // Now set the event and verify that a future waiter gets the signal immediately. | ||||
this.evt.Set(); | |||||
waitTask = this.evt.WaitOneAsync(TimeSpan.FromMilliseconds(500)); | |||||
_aare.Set(); | |||||
waitTask = _aare.WaitOneAsync(TimeSpan.FromMilliseconds(500)); | |||||
Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); | Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status); | ||||
} | } | ||||
@@ -160,7 +160,7 @@ namespace MQTTnet.Core.Tests | |||||
public void WaitAsync_Canceled_DoesNotInlineContinuations() | public void WaitAsync_Canceled_DoesNotInlineContinuations() | ||||
{ | { | ||||
var cts = new CancellationTokenSource(); | var cts = new CancellationTokenSource(); | ||||
var task = this.evt.WaitOneAsync(cts.Token); | |||||
var task = _aare.WaitOneAsync(cts.Token); | |||||
var completingActionFinished = new ManualResetEventSlim(); | var completingActionFinished = new ManualResetEventSlim(); | ||||
var continuation = task.ContinueWith( | var continuation = task.ContinueWith( | ||||