Browse Source

Merge pull request #270 from sebastienwarin/develop

Replace the Async coordination primitives & replace the MqttTopicFilterComparer to improve the performance
release/3.x.x
Christian 6 years ago
committed by GitHub
parent
commit
d72ac9694e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 503 additions and 66 deletions
  1. +90
    -8
      Frameworks/MQTTnet.NetStandard/Internal/AsyncAutoResetEvent.cs
  2. +26
    -4
      Frameworks/MQTTnet.NetStandard/Internal/AsyncLock.cs
  3. +1
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
  4. +3
    -8
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
  5. +1
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs
  6. +2
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs
  7. +98
    -28
      Frameworks/MQTTnet.NetStandard/Server/MqttTopicFilterComparer.cs
  8. +1
    -0
      Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj
  9. +4
    -0
      Tests/MQTTnet.Benchmarks/Program.cs
  10. +91
    -0
      Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs
  11. +184
    -8
      Tests/MQTTnet.Core.Tests/AsyncAutoResentEventTests.cs
  12. +2
    -7
      Tests/MQTTnet.Core.Tests/AsyncLockTests.cs

+ 90
- 8
Frameworks/MQTTnet.NetStandard/Internal/AsyncAutoResetEvent.cs View File

@@ -1,26 +1,108 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Internal
{
public sealed class AsyncAutoResetEvent : IDisposable
// Inspired from Stephen Toub (https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-2-asyncautoresetevent/) and Chris Gillum (https://stackoverflow.com/a/43012490)
public class AsyncAutoResetEvent
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0, 1);
private readonly LinkedList<TaskCompletionSource<bool>> _waiters = new LinkedList<TaskCompletionSource<bool>>();
private bool _isSignaled;

public Task WaitOneAsync(CancellationToken cancellationToken)
public AsyncAutoResetEvent() : this(false)
{ }

public AsyncAutoResetEvent(bool signaled)
{
return _semaphore.WaitAsync(cancellationToken);
_isSignaled = signaled;
}

public void Set()
public Task<bool> WaitOneAsync()
{
return WaitOneAsync(CancellationToken.None);
}

public Task<bool> WaitOneAsync(TimeSpan timeout)
{
_semaphore.Release();
return WaitOneAsync(timeout, CancellationToken.None);
}

public void Dispose()
public Task<bool> WaitOneAsync(CancellationToken cancellationToken)
{
_semaphore?.Dispose();
return WaitOneAsync(Timeout.InfiniteTimeSpan, cancellationToken);
}

public async Task<bool> WaitOneAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

TaskCompletionSource<bool> tcs;

lock (_waiters)
{
if (_isSignaled)
{
_isSignaled = false;
return true;
}
else if (timeout == TimeSpan.Zero)
{
return _isSignaled;
}
else
{
tcs = new TaskCompletionSource<bool>();
_waiters.AddLast(tcs);
}
}

Task winner = await Task.WhenAny(tcs.Task, Task.Delay(timeout, cancellationToken)).ConfigureAwait(false);
if (winner == tcs.Task)
{
// The task was signaled.
return true;
}
else
{
// We timed-out; remove our reference to the task.
// This is an O(n) operation since waiters is a LinkedList<T>.
lock (_waiters)
{
_waiters.Remove(tcs);
if (winner.Status == TaskStatus.Canceled)
{
throw new OperationCanceledException(cancellationToken);
}
else
{
throw new TimeoutException();
}
}
}
}

public void Set()
{
TaskCompletionSource<bool> toRelease = null;

lock (_waiters)
{
if (_waiters.Count > 0)
{
// Signal the first task in the waiters list.
toRelease = _waiters.First.Value;
_waiters.RemoveFirst();
}
else if (!_isSignaled)
{
// No tasks are pending
_isSignaled = true;
}
}

toRelease?.SetResult(true);
}
}
}

+ 26
- 4
Frameworks/MQTTnet.NetStandard/Internal/AsyncLock.cs View File

@@ -4,23 +4,45 @@ using System.Threading.Tasks;

namespace MQTTnet.Internal
{
// From Stephen Toub (https://blogs.msdn.microsoft.com/pfxteam/2012/02/12/building-async-coordination-primitives-part-6-asynclock/)
public sealed class AsyncLock : IDisposable
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly Task<IDisposable> _releaser;

public Task EnterAsync(CancellationToken cancellationToken)
public AsyncLock()
{
return _semaphore.WaitAsync(cancellationToken);
_releaser = Task.FromResult((IDisposable)new Releaser(this));
}

public void Exit()
public Task<IDisposable> LockAsync(CancellationToken cancellationToken)
{
_semaphore.Release();
Task wait = _semaphore.WaitAsync(cancellationToken);
return wait.IsCompleted ?
_releaser :
wait.ContinueWith((_, state) => (IDisposable)state,
_releaser.Result, cancellationToken,
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}

public void Dispose()
{
_semaphore?.Dispose();
}

private sealed class Releaser : IDisposable
{
private readonly AsyncLock _toRelease;

internal Releaser(AsyncLock toRelease)
{
_toRelease = toRelease;
}

public void Dispose()
{
_toRelease._semaphore.Release();
}
}
}
}

+ 1
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs View File

@@ -75,7 +75,7 @@ namespace MQTTnet.Server

public void Dispose()
{
_queueAutoResetEvent?.Dispose();
}

private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)


+ 3
- 8
Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs View File

@@ -136,9 +136,9 @@ namespace MQTTnet.Server
return Task.FromResult((IList<IMqttClientSessionStatus>)result);
}

public void StartDispatchApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
public Task StartDispatchApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
{
Task.Run(() => DispatchApplicationMessageAsync(senderClientSession, applicationMessage));
return DispatchApplicationMessageAsync(senderClientSession, applicationMessage);
}

public Task SubscribeAsync(string clientId, IList<TopicFilter> topicFilters)
@@ -197,8 +197,7 @@ namespace MQTTnet.Server

private async Task<GetOrCreateClientSessionResult> PrepareClientSessionAsync(MqttConnectPacket connectPacket)
{
await _sessionPreparationLock.EnterAsync(CancellationToken.None).ConfigureAwait(false);
try
using (await _sessionPreparationLock.LockAsync(CancellationToken.None).ConfigureAwait(false))
{
var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var clientSession);
if (isSessionPresent)
@@ -232,10 +231,6 @@ namespace MQTTnet.Server

return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession };
}
finally
{
_sessionPreparationLock.Exit();
}
}

private async Task DispatchApplicationMessageAsync(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)


+ 1
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttRetainedMessagesManager.cs View File

@@ -82,7 +82,7 @@ namespace MQTTnet.Server
{
var saveIsRequired = false;

if (applicationMessage.Payload?.Length > 0)
if (applicationMessage.Payload?.Length == 0)
{
saveIsRequired = _messages.TryRemove(applicationMessage.Topic, out _);
_logger.Info("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic);


+ 2
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs View File

@@ -93,7 +93,7 @@ namespace MQTTnet.Server
}

_logger.Info("Started.");
Started?.Invoke(this, new MqttServerStartedEventArgs());
Started?.Invoke(this, EventArgs.Empty);
}

public async Task StopAsync()
@@ -117,6 +117,7 @@ namespace MQTTnet.Server
await _clientSessionsManager.StopAsync().ConfigureAwait(false);

_logger.Info("Stopped.");
Stopped?.Invoke(this, EventArgs.Empty);
}
finally
{


+ 98
- 28
Frameworks/MQTTnet.NetStandard/Server/MqttTopicFilterComparer.cs View File

@@ -4,46 +4,116 @@ namespace MQTTnet.Server
{
public static class MqttTopicFilterComparer
{
private static readonly char[] TopicLevelSeparator = { '/' };
private const char LEVEL_SEPARATOR = '/';
private const char WILDCARD_MULTI_LEVEL = '#';
private const char WILDCARD_SINGLE_LEVEL = '+';

public static bool IsMatch(string topic, string filter)
{
if (topic == null) throw new ArgumentNullException(nameof(topic));
if (filter == null) throw new ArgumentNullException(nameof(filter));
if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException(nameof(topic));
if (string.IsNullOrEmpty(filter)) throw new ArgumentNullException(nameof(filter));

if (string.Equals(topic, filter, StringComparison.Ordinal))
{
return true;
}

var fragmentsTopic = topic.Split(TopicLevelSeparator, StringSplitOptions.None);
var fragmentsFilter = filter.Split(TopicLevelSeparator, StringSplitOptions.None);
int spos = 0;
int slen = filter.Length;
int tpos = 0;
int tlen = topic.Length;

// # > In either case it MUST be the last character specified in the Topic Filter [MQTT-4.7.1-2].
for (var i = 0; i < fragmentsFilter.Length; i++)
while (spos < slen && tpos < tlen)
{
if (fragmentsFilter[i] == "+")
{
continue;
}

if (fragmentsFilter[i] == "#")
if (filter[spos] == topic[tpos])
{
return true;
if (tpos == tlen - 1)
{
/* Check for e.g. foo matching foo/# */
if (spos == slen - 3
&& filter[spos + 1] == LEVEL_SEPARATOR
&& filter[spos + 2] == WILDCARD_MULTI_LEVEL)
{
return true;
}
}
spos++;
tpos++;
if (spos == slen && tpos == tlen)
{
return true;
}
else if (tpos == tlen && spos == slen - 1 && filter[spos] == WILDCARD_SINGLE_LEVEL)
{
if (spos > 0 && filter[spos - 1] != LEVEL_SEPARATOR)
{
// Invalid filter string
return false;
}
spos++;
return true;
}
}

if (i >= fragmentsTopic.Length)
{
return false;
}

if (!string.Equals(fragmentsFilter[i], fragmentsTopic[i], StringComparison.Ordinal))
else
{
return false;
if (filter[spos] == WILDCARD_SINGLE_LEVEL)
{
/* Check for bad "+foo" or "a/+foo" subscription */
if (spos > 0 && filter[spos - 1] != LEVEL_SEPARATOR)
{
// Invalid filter string
return false;
}
/* Check for bad "foo+" or "foo+/a" subscription */
if (spos < slen - 1 && filter[spos + 1] != LEVEL_SEPARATOR)
{
// Invalid filter string
return false;
}
spos++;
while (tpos < tlen && topic[tpos] != LEVEL_SEPARATOR)
{
tpos++;
}
if (tpos == tlen && spos == slen)
{
return true;
}
}
else if (filter[spos] == WILDCARD_MULTI_LEVEL)
{
if (spos > 0 && filter[spos - 1] != LEVEL_SEPARATOR)
{
// Invalid filter string
return false;
}
if (spos + 1 != slen)
{
// Invalid filter string
return false;
}
else
{
return true;
}
}
else
{
/* Check for e.g. foo/bar matching foo/+/# */
if (spos > 0
&& spos + 2 == slen
&& tpos == tlen
&& filter[spos - 1] == WILDCARD_SINGLE_LEVEL
&& filter[spos] == LEVEL_SEPARATOR
&& filter[spos + 1] == WILDCARD_MULTI_LEVEL)
{
return true;
}
return false;
}
}
}
if (tpos < tlen || spos < slen)
{
return false;
}

return fragmentsTopic.Length == fragmentsFilter.Length;
return false;
}
}
}

+ 1
- 0
Tests/MQTTnet.Benchmarks/MQTTnet.Benchmarks.csproj View File

@@ -147,6 +147,7 @@
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="TopicFilterComparerBenchmark.cs" />
<Compile Include="LoggerBenchmark.cs" />
<Compile Include="MessageProcessingBenchmark.cs" />
<Compile Include="Program.cs" />


+ 4
- 0
Tests/MQTTnet.Benchmarks/Program.cs View File

@@ -12,6 +12,7 @@ namespace MQTTnet.Benchmarks
Console.WriteLine("1 = MessageProcessingBenchmark");
Console.WriteLine("2 = SerializerBenchmark");
Console.WriteLine("3 = LoggerBenchmark");
Console.WriteLine("4 = TopicFilterComparerBenchmark");

var pressedKey = Console.ReadKey(true);
switch (pressedKey.KeyChar)
@@ -25,6 +26,9 @@ namespace MQTTnet.Benchmarks
case '3':
BenchmarkRunner.Run<LoggerBenchmark>();
break;
case '4':
BenchmarkRunner.Run<TopicFilterComparerBenchmark>();
break;
}

Console.ReadLine();


+ 91
- 0
Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs View File

@@ -0,0 +1,91 @@
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Attributes.Exporters;
using BenchmarkDotNet.Attributes.Jobs;
using MQTTnet.Server;
using System;

namespace MQTTnet.Benchmarks
{
[ClrJob]
[RPlotExporter]
[MemoryDiagnoser]
public class TopicFilterComparerBenchmark
{
private static readonly char[] TopicLevelSeparator = { '/' };

[GlobalSetup]
public void Setup()
{
}

[Benchmark]
public void MqttTopicFilterComparer_10000_StringSplitMethod()
{
for (var i = 0; i < 10000; i++)
{
LegacyMethodByStringSplit("sport/tennis/player1", "sport/#");
LegacyMethodByStringSplit("sport/tennis/player1/ranking", "sport/#/ranking");
LegacyMethodByStringSplit("sport/tennis/player1/score/wimbledon", "sport/+/player1/#");
LegacyMethodByStringSplit("sport/tennis/player1", "sport/tennis/+");
LegacyMethodByStringSplit("/finance", "+/+");
LegacyMethodByStringSplit("/finance", "/+");
LegacyMethodByStringSplit("/finance", "+");
}
}

[Benchmark]
public void MqttTopicFilterComparer_10000_LoopMethod()
{
for (var i = 0; i < 10000; i++)
{
MqttTopicFilterComparer.IsMatch("sport/tennis/player1", "sport/#");
MqttTopicFilterComparer.IsMatch("sport/tennis/player1/ranking", "sport/#/ranking");
MqttTopicFilterComparer.IsMatch("sport/tennis/player1/score/wimbledon", "sport/+/player1/#");
MqttTopicFilterComparer.IsMatch("sport/tennis/player1", "sport/tennis/+");
MqttTopicFilterComparer.IsMatch("/finance", "+/+");
MqttTopicFilterComparer.IsMatch("/finance", "/+");
MqttTopicFilterComparer.IsMatch("/finance", "+");
}
}

private static bool LegacyMethodByStringSplit(string topic, string filter)
{
if (topic == null) throw new ArgumentNullException(nameof(topic));
if (filter == null) throw new ArgumentNullException(nameof(filter));

if (string.Equals(topic, filter, StringComparison.Ordinal))
{
return true;
}

var fragmentsTopic = topic.Split(TopicLevelSeparator, StringSplitOptions.None);
var fragmentsFilter = filter.Split(TopicLevelSeparator, StringSplitOptions.None);

// # > In either case it MUST be the last character specified in the Topic Filter [MQTT-4.7.1-2].
for (var i = 0; i < fragmentsFilter.Length; i++)
{
if (fragmentsFilter[i] == "+")
{
continue;
}

if (fragmentsFilter[i] == "#")
{
return true;
}

if (i >= fragmentsTopic.Length)
{
return false;
}

if (!string.Equals(fragmentsFilter[i], fragmentsTopic[i], StringComparison.Ordinal))
{
return false;
}
}

return fragmentsTopic.Length == fragmentsFilter.Length;
}
}
}

+ 184
- 8
Tests/MQTTnet.Core.Tests/AsyncAutoResentEventTests.cs View File

@@ -1,34 +1,210 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Internal;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Core.Tests
{
[TestClass]
// Inspired from the vs-threading tests (https://github.com/Microsoft/vs-threading/blob/master/src/Microsoft.VisualStudio.Threading.Tests/AsyncAutoResetEventTests.cs)
public class AsyncAutoResetEventTests
{
private readonly AsyncAutoResetEvent _aare;

public AsyncAutoResetEventTests()
{
_aare = new AsyncAutoResetEvent();
}

[TestMethod]
public async Task SingleThreadedPulse()
{
for (int i = 0; i < 5; i++)
{
var t = _aare.WaitOneAsync();
Assert.IsFalse(t.IsCompleted);
_aare.Set();
await t;
Assert.IsTrue(t.IsCompleted);
}
}

[TestMethod]
public async Task MultipleSetOnlySignalsOnce()
{
_aare.Set();
_aare.Set();
await _aare.WaitOneAsync();
var t = _aare.WaitOneAsync();
Assert.IsFalse(t.IsCompleted);
await Task.Delay(500);
Assert.IsFalse(t.IsCompleted);
_aare.Set();
await t;
Assert.IsTrue(t.IsCompleted);
}

[TestMethod]
public async Task OrderPreservingQueue()
{
var waiters = new Task[5];
for (int i = 0; i < waiters.Length; i++)
{
waiters[i] = _aare.WaitOneAsync();
}

for (int i = 0; i < waiters.Length; i++)
{
_aare.Set();
await waiters[i].ConfigureAwait(false);
}
}

/// <summary>
/// Verifies that inlining continuations do not have to complete execution before Set() returns.
/// </summary>
[TestMethod]
public async Task SetReturnsBeforeInlinedContinuations()
{
var setReturned = new ManualResetEventSlim();
var inlinedContinuation = _aare.WaitOneAsync()
.ContinueWith(delegate
{
// Arrange to synchronously block the continuation until Set() has returned,
// which would deadlock if Set does not return until inlined continuations complete.
Assert.IsTrue(setReturned.Wait(500));
});
await Task.Delay(100);
_aare.Set();
setReturned.Set();
Assert.IsTrue(inlinedContinuation.Wait(500));
}

[TestMethod]
public void WaitAsync_WithCancellationToken()
{
var cts = new CancellationTokenSource();
Task waitTask = _aare.WaitOneAsync(cts.Token);
Assert.IsFalse(waitTask.IsCompleted);

// Cancel the request and ensure that it propagates to the task.
cts.Cancel();
try
{
waitTask.GetAwaiter().GetResult();
Assert.IsTrue(false, "Task was expected to transition to a canceled state.");
}
catch (OperationCanceledException ex)
{
Assert.AreEqual(cts.Token, ex.CancellationToken);
}

// Now set the event and verify that a future waiter gets the signal immediately.
_aare.Set();
waitTask = _aare.WaitOneAsync();
Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status);
}

[TestMethod]
public void WaitAsync_WithCancellationToken_Precanceled()
{
// We construct our own pre-canceled token so that we can do
// a meaningful identity check later.
var tokenSource = new CancellationTokenSource();
tokenSource.Cancel();
var token = tokenSource.Token;

// Verify that a pre-set signal is not reset by a canceled wait request.
_aare.Set();
try
{
_aare.WaitOneAsync(token).GetAwaiter().GetResult();
Assert.IsTrue(false, "Task was expected to transition to a canceled state.");
}
catch (OperationCanceledException ex)
{
Assert.AreEqual(token, ex.CancellationToken);
}

// Verify that the signal was not acquired.
Task waitTask = _aare.WaitOneAsync();
Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status);
}

[TestMethod]
public async Task WaitAsync_WithTimeout()
{
Task waitTask = _aare.WaitOneAsync(TimeSpan.FromMilliseconds(500));
Assert.IsFalse(waitTask.IsCompleted);

// Cancel the request and ensure that it propagates to the task.
await Task.Delay(1000).ConfigureAwait(false);
try
{
waitTask.GetAwaiter().GetResult();
Assert.IsTrue(false, "Task was expected to transition to a timeout state.");
}
catch (TimeoutException)
{
Assert.IsTrue(true);
}

// Now set the event and verify that a future waiter gets the signal immediately.
_aare.Set();
waitTask = _aare.WaitOneAsync(TimeSpan.FromMilliseconds(500));
Assert.AreEqual(TaskStatus.RanToCompletion, waitTask.Status);
}

[TestMethod]
public void WaitAsync_Canceled_DoesNotInlineContinuations()
{
var cts = new CancellationTokenSource();
var task = _aare.WaitOneAsync(cts.Token);

var completingActionFinished = new ManualResetEventSlim();
var continuation = task.ContinueWith(
_ => Assert.IsTrue(completingActionFinished.Wait(500)),
CancellationToken.None,
TaskContinuationOptions.None,
TaskScheduler.Default);

cts.Cancel();
completingActionFinished.Set();

// Rethrow the exception if it turned out it deadlocked.
continuation.GetAwaiter().GetResult();
}

[TestMethod]
public async Task AsyncAutoResetEvent()
{
var aare = new AsyncAutoResetEvent();

var increment = 0;
var globalI = 0;
#pragma warning disable 4014
Task.Run(async () =>
#pragma warning restore 4014
{
await aare.WaitOneAsync(CancellationToken.None);
globalI += increment;
globalI += 1;
});

#pragma warning disable 4014
Task.Run(async () =>
#pragma warning restore 4014
{
await aare.WaitOneAsync(CancellationToken.None);
globalI += 2;
});

await Task.Delay(500);
aare.Set();
await Task.Delay(500);
increment = 1;
aare.Set();
await Task.Delay(100);

Assert.AreEqual(1, globalI);
Assert.AreEqual(3, globalI);
}
}
}
}

+ 2
- 7
Tests/MQTTnet.Core.Tests/AsyncLockTests.cs View File

@@ -21,22 +21,17 @@ namespace MQTTnet.Core.Tests
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
threads[i] = Task.Run(async () =>
{
await @lock.EnterAsync(CancellationToken.None);
try
using (var releaser = await @lock.LockAsync(CancellationToken.None))
{
var localI = globalI;
await Task.Delay(10); // Increase the chance for wrong data.
localI++;
globalI = localI;
}
finally
{
@lock.Exit();
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}

Task.WaitAll(threads);
Assert.AreEqual(ThreadsCount, globalI);
}


Loading…
Cancel
Save