Browse Source

Performance optimizations

release/3.x.x
Christian Kratky 7 years ago
parent
commit
bc959a8c3d
8 changed files with 44 additions and 60 deletions
  1. +4
    -0
      Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj
  2. +6
    -6
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
  3. +11
    -7
      Frameworks/MQTTnet.NetStandard/Internal/TaskExtensions.cs
  4. +4
    -0
      Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj
  5. +4
    -4
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
  6. +1
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
  7. +1
    -1
      README.md
  8. +13
    -41
      Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs

+ 4
- 0
Frameworks/MQTTnet.AspnetCore/MQTTnet.AspnetCore.csproj View File

@@ -6,6 +6,10 @@
<FileVersion>2.5.2.0</FileVersion> <FileVersion>2.5.2.0</FileVersion>
</PropertyGroup> </PropertyGroup>


<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<DefineConstants>RELEASE;NETSTANDARD2_0</DefineConstants>
</PropertyGroup>

<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.0.0" /> <PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="2.0.0" /> <PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="2.0.0" />


+ 6
- 6
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs View File

@@ -31,23 +31,23 @@ namespace MQTTnet.Adapter


public IMqttPacketSerializer PacketSerializer { get; } public IMqttPacketSerializer PacketSerializer { get; }


public async Task ConnectAsync(TimeSpan timeout)
public Task ConnectAsync(TimeSpan timeout)
{ {
_logger.Info<MqttChannelAdapter>("Connecting [Timeout={0}]", timeout); _logger.Info<MqttChannelAdapter>("Connecting [Timeout={0}]", timeout);


await ExecuteAndWrapExceptionAsync(() => _channel.ConnectAsync().TimeoutAfter(timeout));
return ExecuteAndWrapExceptionAsync(() => _channel.ConnectAsync().TimeoutAfter(timeout));
} }


public async Task DisconnectAsync(TimeSpan timeout)
public Task DisconnectAsync(TimeSpan timeout)
{ {
_logger.Info<MqttChannelAdapter>("Disconnecting [Timeout={0}]", timeout); _logger.Info<MqttChannelAdapter>("Disconnecting [Timeout={0}]", timeout);


await ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout));
return ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout));
} }


public async Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable<MqttBasePacket> packets)
public Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable<MqttBasePacket> packets)
{ {
await ExecuteAndWrapExceptionAsync(async () =>
return ExecuteAndWrapExceptionAsync(async () =>
{ {
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try try


+ 11
- 7
Frameworks/MQTTnet.NetStandard/Internal/TaskExtensions.cs View File

@@ -9,13 +9,15 @@ namespace MQTTnet.Internal
{ {
public static async Task TimeoutAfter(this Task task, TimeSpan timeout) public static async Task TimeoutAfter(this Task task, TimeSpan timeout)
{ {
using (var cancellationTokenSource = new CancellationTokenSource())
if (task == null) throw new ArgumentNullException(nameof(task));

using (var timeoutCts = new CancellationTokenSource())
{ {
try try
{ {
var timeoutTask = Task.Delay(timeout, cancellationTokenSource.Token);
var timeoutTask = Task.Delay(timeout, timeoutCts.Token);
var finishedTask = await Task.WhenAny(timeoutTask, task).ConfigureAwait(false); var finishedTask = await Task.WhenAny(timeoutTask, task).ConfigureAwait(false);
if (finishedTask == timeoutTask) if (finishedTask == timeoutTask)
{ {
throw new MqttCommunicationTimedOutException(); throw new MqttCommunicationTimedOutException();
@@ -33,18 +35,20 @@ namespace MQTTnet.Internal
} }
finally finally
{ {
cancellationTokenSource.Cancel();
timeoutCts.Cancel();
} }
} }
} }


public static async Task<TResult> TimeoutAfter<TResult>(this Task<TResult> task, TimeSpan timeout) public static async Task<TResult> TimeoutAfter<TResult>(this Task<TResult> task, TimeSpan timeout)
{ {
using (var cancellationTokenSource = new CancellationTokenSource())
if (task == null) throw new ArgumentNullException(nameof(task));

using (var timeoutCts = new CancellationTokenSource())
{ {
try try
{ {
var timeoutTask = Task.Delay(timeout, cancellationTokenSource.Token);
var timeoutTask = Task.Delay(timeout, timeoutCts.Token);
var finishedTask = await Task.WhenAny(timeoutTask, task).ConfigureAwait(false); var finishedTask = await Task.WhenAny(timeoutTask, task).ConfigureAwait(false);


if (finishedTask == timeoutTask) if (finishedTask == timeoutTask)
@@ -66,7 +70,7 @@ namespace MQTTnet.Internal
} }
finally finally
{ {
cancellationTokenSource.Cancel();
timeoutCts.Cancel();
} }
} }
} }


+ 4
- 0
Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj View File

@@ -36,6 +36,10 @@


<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" /> <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" />


<PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Release|netstandard1.3|AnyCPU'">
<DefineConstants>RELEASE;NETSTANDARD1_3</DefineConstants>
</PropertyGroup>

<ItemGroup Condition="'$(TargetFramework)'=='netstandard2.0'"> <ItemGroup Condition="'$(TargetFramework)'=='netstandard2.0'">
<PackageReference Include="System.Net.Security" Version="4.3.2" /> <PackageReference Include="System.Net.Security" Version="4.3.2" />
<PackageReference Include="System.Net.WebSockets" Version="4.3.0" /> <PackageReference Include="System.Net.WebSockets" Version="4.3.0" />


+ 4
- 4
Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs View File

@@ -143,14 +143,14 @@ namespace MQTTnet.Server


private Task ProcessReceivedPacketAsync(IMqttChannelAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken) private Task ProcessReceivedPacketAsync(IMqttChannelAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken)
{ {
if (packet is MqttPingReqPacket)
if (packet is MqttPublishPacket publishPacket)
{ {
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttPingRespPacket());
return HandleIncomingPublishPacketAsync(adapter, publishPacket, cancellationToken);
} }


if (packet is MqttPublishPacket publishPacket)
if (packet is MqttPingReqPacket)
{ {
return HandleIncomingPublishPacketAsync(adapter, publishPacket, cancellationToken);
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttPingRespPacket());
} }


if (packet is MqttPubRelPacket pubRelPacket) if (packet is MqttPubRelPacket pubRelPacket)


+ 1
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs View File

@@ -168,7 +168,7 @@ namespace MQTTnet.Server


lock (_sessions) lock (_sessions)
{ {
foreach (var clientSession in _sessions.Values.ToList())
foreach (var clientSession in _sessions.Values)
{ {
clientSession.EnqueueApplicationMessage(applicationMessage); clientSession.EnqueueApplicationMessage(applicationMessage);
} }


+ 1
- 1
README.md View File

@@ -18,7 +18,7 @@ MQTTnet is a high performance .NET library for MQTT based communication. It prov
* TLS 1.2 support for client and server (but not UWP servers) * TLS 1.2 support for client and server (but not UWP servers)
* Extensible communication channels (i.e. In-Memory, TCP, TCP+TLS, WS) * Extensible communication channels (i.e. In-Memory, TCP, TCP+TLS, WS)
* Lightweight (only the low level implementation of MQTT, no overhead) * Lightweight (only the low level implementation of MQTT, no overhead)
* Performance optimized (processing ~27.000 messages / second)*
* Performance optimized (processing ~30.000 messages / second)*
* Interfaces included for mocking and testing * Interfaces included for mocking and testing
* Access to internal trace messages * Access to internal trace messages
* Unit tested (70+ tests) * Unit tested (70+ tests)


+ 13
- 41
Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs View File

@@ -43,33 +43,6 @@ namespace MQTTnet.TestApp.NetCore


var client = new MqttFactory().CreateMqttClient(); var client = new MqttFactory().CreateMqttClient();


client.Connected += async (s, e) =>
{
Console.WriteLine("### CONNECTED WITH SERVER ###");

await client.SubscribeAsync(new List<TopicFilter>
{
new TopicFilter("#")
});

Console.WriteLine("### SUBSCRIBED ###");
};

client.Disconnected += async (s, e) =>
{
Console.WriteLine("### DISCONNECTED FROM SERVER ###");
await Task.Delay(TimeSpan.FromSeconds(5));

try
{
await client.ConnectAsync(options);
}
catch
{
Console.WriteLine("### RECONNECTING FAILED ###");
}
};

try try
{ {
await client.ConnectAsync(options); await client.ConnectAsync(options);
@@ -79,24 +52,12 @@ namespace MQTTnet.TestApp.NetCore
Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception); Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception);
} }


Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###");

var testMessageCount = 10000;
var message = CreateMessage(); var message = CreateMessage();
var stopwatch = Stopwatch.StartNew();
for (var i = 0; i < testMessageCount; i++)
{
await client.PublishAsync(message);
}

stopwatch.Stop();
Console.WriteLine($"Sent 10.000 messages within {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / (float)testMessageCount} ms / message).");

var messages = new[] { message }; var messages = new[] { message };
var sentMessagesCount = 0;


stopwatch.Restart();
var stopwatch = Stopwatch.StartNew();


var sentMessagesCount = 0;
while (stopwatch.ElapsedMilliseconds < 1000) while (stopwatch.ElapsedMilliseconds < 1000)
{ {
await client.PublishAsync(messages).ConfigureAwait(false); await client.PublishAsync(messages).ConfigureAwait(false);
@@ -104,6 +65,17 @@ namespace MQTTnet.TestApp.NetCore
} }


Console.WriteLine($"Sending {sentMessagesCount} messages per second."); Console.WriteLine($"Sending {sentMessagesCount} messages per second.");
stopwatch.Restart();

var testMessageCount = 10000;
for (var i = 0; i < testMessageCount; i++)
{
await client.PublishAsync(message);
}

stopwatch.Stop();
Console.WriteLine($"Sent 10.000 messages within {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / (float)testMessageCount} ms / message).");


var last = DateTime.Now; var last = DateTime.Now;
var msgCount = 0; var msgCount = 0;


Loading…
Cancel
Save