From ae11e31048d6178ba66449fa82952bbd0ac9fe2d Mon Sep 17 00:00:00 2001 From: Christian Date: Tue, 17 Apr 2018 20:37:56 +0200 Subject: [PATCH] Refactoring --- Build/MQTTnet.nuspec | 1 + .../Adapter/MqttChannelAdapter.cs | 25 ++++++++----------- .../Implementations/MqttTcpChannel.cs | 7 +++--- .../Implementations/MqttTcpServerAdapter.cs | 10 ++++---- .../Serializer/MqttPacketSerializer.cs | 18 ++++++------- .../Serializer/MqttPacketWriter.cs | 10 ++++---- .../PerformanceTest.cs | 4 +-- 7 files changed, 35 insertions(+), 40 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index cb859b1..8a26bd4 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -11,6 +11,7 @@ false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). * [Client] Added new overloads for the message builder. +* [Core] Performance optimizations (thanks to @ israellot) Copyright Christian Kratky 2016-2018 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index 7fffa16..087b104 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -1,9 +1,6 @@ using System; -using System.Collections.Generic; using System.IO; -using System.Linq; using System.Net.Sockets; -using System.Runtime.ExceptionServices; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -53,14 +50,13 @@ namespace MQTTnet.Adapter public async Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, MqttBasePacket[] packets) { - for(var i=0;i("TX >>> {0} [Timeout={1}]", packet, timeout); var packetData = PacketSerializer.Serialize(packet); + if (cancellationToken.IsCancellationRequested) { return; } + await _channel.SendStream.WriteAsync( packetData.Array, packetData.Offset, - (int)packetData.Count, + packetData.Count, cancellationToken).ConfigureAwait(false); + await _channel.SendStream.FlushAsync(cancellationToken).ConfigureAwait(false); }); } @@ -113,12 +111,11 @@ namespace MQTTnet.Adapter { receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, linkedCts.Token).ConfigureAwait(false); } - catch(OperationCanceledException ex) + catch (OperationCanceledException ex) { - //check if timed out - if(linkedCts.IsCancellationRequested && !cancellationToken.IsCancellationRequested) + var timedOut = linkedCts.IsCancellationRequested && !cancellationToken.IsCancellationRequested; + if (timedOut) { - //only timeout token was cancelled throw new MqttCommunicationTimedOutException(ex); } else diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index 85fe31d..0f9bba4 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -14,7 +14,7 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttChannel { -#if NET452 || NET461 +#if NET452 || NET461 || NETSTANDARD2_0 // ReSharper disable once MemberCanBePrivate.Global // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global public static int BufferSize { get; set; } = 4096 * 20; // Can be changed for fine tuning by library user. @@ -60,7 +60,6 @@ namespace MQTTnet.Implementations if (_socket == null) { _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); - } #if NET452 || NET461 @@ -212,8 +211,8 @@ namespace MQTTnet.Implementations { stream = new NetworkStream(_socket, true); } - -#if NET452 || NET461 + +#if NET452 || NET461 || NETSTANDARD2_0 SendStream = new BufferedStream(stream, _bufferSize); ReceiveStream = new BufferedStream(stream, _bufferSize); #else diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs index 8f896d5..b718dbb 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs @@ -38,8 +38,7 @@ namespace MQTTnet.Implementations if (options.DefaultEndpointOptions.IsEnabled) { - _defaultEndpointSocket = new Socket(SocketType.Stream, ProtocolType.Tcp); - _defaultEndpointSocket.NoDelay = true; + _defaultEndpointSocket = new Socket(SocketType.Stream, ProtocolType.Tcp) { NoDelay = true }; _defaultEndpointSocket.Bind(new IPEndPoint(options.DefaultEndpointOptions.BoundIPAddress, options.GetDefaultEndpointPort())); _defaultEndpointSocket.Listen(options.ConnectionBacklog); @@ -80,7 +79,7 @@ namespace MQTTnet.Implementations _defaultEndpointSocket = null; _tlsCertificate = null; - + _tlsEndpointSocket?.Dispose(); _tlsEndpointSocket = null; @@ -104,7 +103,8 @@ namespace MQTTnet.Implementations #else var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false); #endif - clientSocket.NoDelay=true; + clientSocket.NoDelay = true; + var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer(), _logger); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } @@ -139,7 +139,7 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); - + var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer(), _logger); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs index 75e7239..2447e05 100644 --- a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs +++ b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs @@ -2,7 +2,6 @@ using MQTTnet.Packets; using MQTTnet.Protocol; using System; -using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; @@ -23,29 +22,28 @@ namespace MQTTnet.Serializer using (var stream = new MemoryStream(128)) using (var writer = new MqttPacketWriter(stream)) { - //leave enough head space for max header (fixed + 4 variable remaining lenght) + // Leave enough head space for max header size (fixed + 4 variable remaining length) stream.Position = 5; var fixedHeader = SerializePacket(packet, writer); - var remainingLength = MqttPacketWriter.GetRemainingLength((int)stream.Length-5); + var remainingLength = MqttPacketWriter.GetRemainingLength((int)stream.Length - 5); var headerSize = remainingLength.Length + 1; var headerOffset = 5 - headerSize; - //position curson on correct offset on beginining of array + // Position cursor on correct offset on beginining of array (has leading 0x0) stream.Position = headerOffset; - - //write header + writer.Write(fixedHeader); - writer.Write(remainingLength,0,remainingLength.Length); - + writer.Write(remainingLength, 0, remainingLength.Length); + #if NET461 || NET452 || NETSTANDARD2_0 var buffer = stream.GetBuffer(); #else var buffer = stream.ToArray(); #endif - return new ArraySegment(buffer, headerOffset, (int)stream.Length- headerOffset); + return new ArraySegment(buffer, headerOffset, (int)stream.Length - headerOffset); } } @@ -300,7 +298,7 @@ namespace MQTTnet.Serializer return packet; } - + private static void ValidateConnectPacket(MqttConnectPacket packet) { if (packet == null) throw new ArgumentNullException(nameof(packet)); diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs index e7cfa4c..253294b 100644 --- a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs +++ b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs @@ -60,11 +60,11 @@ namespace MQTTnet.Serializer { if (length <= 0) { - return new [] { (byte)0 }; + return new[] { (byte)0 }; } var bytes = new byte[4]; - int arraySize = 0; + var offset = 0; // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. var x = length; @@ -77,12 +77,12 @@ namespace MQTTnet.Serializer encodedByte = encodedByte | 128; } - bytes[arraySize] = (byte)encodedByte; + bytes[offset] = (byte)encodedByte; - arraySize++; + offset++; } while (x > 0); - return bytes.Take(arraySize).ToArray(); + return bytes.Take(offset).ToArray(); } } } diff --git a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs index 5f2e5ad..b485533 100644 --- a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs @@ -44,7 +44,7 @@ namespace MQTTnet.TestApp.NetCore try { - await client.ConnectAsync(options); + await client.ConnectAsync(options).ConfigureAwait(false); } catch (Exception exception) { @@ -165,7 +165,7 @@ namespace MQTTnet.TestApp.NetCore Console.WriteLine("Press any key to exit."); Console.ReadLine(); - await mqttServer.StopAsync(); + await mqttServer.StopAsync().ConfigureAwait(false); } catch (Exception e) {