From c915af8dada22d92b186a2413bc96341a319aeb1 Mon Sep 17 00:00:00 2001 From: Israel Lot Date: Mon, 16 Apr 2018 12:30:56 -0300 Subject: [PATCH 1/8] Disable Nagle on sockets, send packets in one shot. --- .../Adapter/MqttChannelAdapter.cs | 58 ++++++++----------- .../Implementations/MqttTcpChannel.cs | 3 + .../Implementations/MqttTcpServerAdapter.cs | 4 +- .../Serializer/IMqttPacketSerializer.cs | 2 +- .../Serializer/MqttPacketSerializer.cs | 27 +++++---- .../Serializer/MqttPacketWriter.cs | 15 +++-- 6 files changed, 59 insertions(+), 50 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index 123b88d..8adac69 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.IO; +using System.Linq; using System.Net.Sockets; using System.Runtime.InteropServices; using System.Threading; @@ -55,52 +56,41 @@ namespace MQTTnet.Adapter return ExecuteAndWrapExceptionAsync(async () => { - await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - try + foreach (var packet in packets) { - foreach (var packet in packets) - { - if (cancellationToken.IsCancellationRequested) - { - return; - } - - if (packet == null) - { - continue; - } - - _logger.Verbose("TX >>> {0} [Timeout={1}]", packet, timeout); - - var chunks = PacketSerializer.Serialize(packet); - foreach (var chunk in chunks) - { - if (cancellationToken.IsCancellationRequested) - { - return; - } - - await _channel.SendStream.WriteAsync(chunk.Array, chunk.Offset, chunk.Count, cancellationToken).ConfigureAwait(false); - } - } - if (cancellationToken.IsCancellationRequested) { return; } - if (timeout > TimeSpan.Zero) + if (packet == null) { - await _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false); + continue; } - else + + _logger.Verbose("TX >>> {0} [Timeout={1}]", packet, timeout); + + var packetData = PacketSerializer.Serialize(packet); + if (cancellationToken.IsCancellationRequested) { - await _channel.SendStream.FlushAsync(cancellationToken).ConfigureAwait(false); + return; } + await _channel.SendStream.WriteAsync(packetData.Array, packetData.Offset, (int)packetData.Count, cancellationToken).ConfigureAwait(false); + } - finally + + if (cancellationToken.IsCancellationRequested) + { + return; + } + + if (timeout > TimeSpan.Zero) + { + await _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false); + } + else { - _semaphore.Release(); + await _channel.SendStream.FlushAsync(cancellationToken).ConfigureAwait(false); } }); } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index 8d43d87..85fe31d 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -60,6 +60,7 @@ namespace MQTTnet.Implementations if (_socket == null) { _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + } #if NET452 || NET461 @@ -68,6 +69,8 @@ namespace MQTTnet.Implementations await _socket.ConnectAsync(_options.Server, _options.GetPort()).ConfigureAwait(false); #endif + _socket.NoDelay = true; + if (_options.TlsOptions.UseTls) { _sslStream = new SslStream(new NetworkStream(_socket, true), false, InternalUserCertificateValidationCallback); diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs index dc33a3f..8f896d5 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs @@ -39,6 +39,8 @@ namespace MQTTnet.Implementations if (options.DefaultEndpointOptions.IsEnabled) { _defaultEndpointSocket = new Socket(SocketType.Stream, ProtocolType.Tcp); + _defaultEndpointSocket.NoDelay = true; + _defaultEndpointSocket.Bind(new IPEndPoint(options.DefaultEndpointOptions.BoundIPAddress, options.GetDefaultEndpointPort())); _defaultEndpointSocket.Listen(options.ConnectionBacklog); @@ -102,7 +104,7 @@ namespace MQTTnet.Implementations #else var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false); #endif - + clientSocket.NoDelay=true; var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer(), _logger); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); } diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/IMqttPacketSerializer.cs b/Frameworks/MQTTnet.NetStandard/Serializer/IMqttPacketSerializer.cs index 6577b0a..6afdd98 100644 --- a/Frameworks/MQTTnet.NetStandard/Serializer/IMqttPacketSerializer.cs +++ b/Frameworks/MQTTnet.NetStandard/Serializer/IMqttPacketSerializer.cs @@ -9,7 +9,7 @@ namespace MQTTnet.Serializer { MqttProtocolVersion ProtocolVersion { get; set; } - ICollection> Serialize(MqttBasePacket mqttPacket); + ArraySegment Serialize(MqttBasePacket mqttPacket); MqttBasePacket Deserialize(MqttPacketHeader header, MemoryStream body); } diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs index 0904c75..75e7239 100644 --- a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs +++ b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs @@ -16,29 +16,36 @@ namespace MQTTnet.Serializer public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; - public ICollection> Serialize(MqttBasePacket packet) + public ArraySegment Serialize(MqttBasePacket packet) { if (packet == null) throw new ArgumentNullException(nameof(packet)); using (var stream = new MemoryStream(128)) using (var writer = new MqttPacketWriter(stream)) { + //leave enough head space for max header (fixed + 4 variable remaining lenght) + stream.Position = 5; + var fixedHeader = SerializePacket(packet, writer); - var remainingLength = (int)stream.Length; - writer.Write(fixedHeader); - MqttPacketWriter.WriteRemainingLength(remainingLength, writer); - var headerLength = (int)stream.Length - remainingLength; + var remainingLength = MqttPacketWriter.GetRemainingLength((int)stream.Length-5); + + var headerSize = remainingLength.Length + 1; + var headerOffset = 5 - headerSize; + + //position curson on correct offset on beginining of array + stream.Position = headerOffset; + + //write header + writer.Write(fixedHeader); + writer.Write(remainingLength,0,remainingLength.Length); + #if NET461 || NET452 || NETSTANDARD2_0 var buffer = stream.GetBuffer(); #else var buffer = stream.ToArray(); #endif - return new List> - { - new ArraySegment(buffer, remainingLength, headerLength), - new ArraySegment(buffer, 0, remainingLength) - }; + return new ArraySegment(buffer, headerOffset, (int)stream.Length- headerOffset); } } diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs index cb3d458..794a4ee 100644 --- a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs +++ b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs @@ -1,5 +1,6 @@ using System; using System.IO; +using System.Linq; using System.Text; using MQTTnet.Protocol; @@ -55,14 +56,16 @@ namespace MQTTnet.Serializer Write(value); } - public static void WriteRemainingLength(int length, BinaryWriter target) + public static byte[] GetRemainingLength(int length) { if (length == 0) { - target.Write((byte)0); - return; + return new byte[] { (byte)0 }; } + var bytes = new byte[4]; + int arraySize = 0; + // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. var x = length; do @@ -74,8 +77,12 @@ namespace MQTTnet.Serializer encodedByte = encodedByte | 128; } - target.Write((byte)encodedByte); + bytes[arraySize] = (byte)encodedByte; + + arraySize++; } while (x > 0); + + return bytes.Take(arraySize).ToArray(); } } } From 9bc34e44065550f8a433186df7cc7c2a58e57596 Mon Sep 17 00:00:00 2001 From: Israel Lot Date: Mon, 16 Apr 2018 13:07:19 -0300 Subject: [PATCH 2/8] Translate timeout exception --- .../Adapter/MqttChannelAdapter.cs | 19 ++++++++++++++++++- .../MqttCommunicationTimedOutException.cs | 7 ++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index 8adac69..e853d6b 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; using System.Net.Sockets; +using System.Runtime.ExceptionServices; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -111,7 +112,23 @@ namespace MQTTnet.Adapter var timeoutCts = new CancellationTokenSource(timeout); var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token); - receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, linkedCts.Token).ConfigureAwait(false); + try + { + receivedMqttPacket = await ReceiveAsync(_channel.ReceiveStream, linkedCts.Token).ConfigureAwait(false); + } + catch(OperationCanceledException ex) + { + //check if timed out + if(linkedCts.IsCancellationRequested && !cancellationToken.IsCancellationRequested) + { + //only timeout token was cancelled + throw new MqttCommunicationTimedOutException(ex); + } + else + { + throw; + } + } } else { diff --git a/Frameworks/MQTTnet.NetStandard/Exceptions/MqttCommunicationTimedOutException.cs b/Frameworks/MQTTnet.NetStandard/Exceptions/MqttCommunicationTimedOutException.cs index 7d0adcd..744035e 100644 --- a/Frameworks/MQTTnet.NetStandard/Exceptions/MqttCommunicationTimedOutException.cs +++ b/Frameworks/MQTTnet.NetStandard/Exceptions/MqttCommunicationTimedOutException.cs @@ -1,6 +1,11 @@ -namespace MQTTnet.Exceptions +using System; + +namespace MQTTnet.Exceptions { public sealed class MqttCommunicationTimedOutException : MqttCommunicationException { + public MqttCommunicationTimedOutException() : base() { } + public MqttCommunicationTimedOutException(Exception innerException) : base(innerException) { } + } } From 786fa53334aef6a47a1e9bcbef764528383b3057 Mon Sep 17 00:00:00 2001 From: Israel Lot Date: Mon, 16 Apr 2018 13:07:35 -0300 Subject: [PATCH 3/8] Fix unit tests for serializer --- .../MqttPacketSerializerTests.cs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index 1111d90..a7421c6 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -403,9 +403,9 @@ namespace MQTTnet.Core.Tests private static void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) { var serializer = new MqttPacketSerializer { ProtocolVersion = protocolVersion }; - var chunks = serializer.Serialize(packet); + var data = serializer.Serialize(packet); - Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(Join(chunks))); + Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(Join(data))); } private static void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) @@ -438,5 +438,16 @@ namespace MQTTnet.Core.Tests return buffer.ToArray(); } + + private static byte[] Join(params ArraySegment[] chunks) + { + var buffer = new MemoryStream(); + foreach (var chunk in chunks) + { + buffer.Write(chunk.Array, chunk.Offset, chunk.Count); + } + + return buffer.ToArray(); + } } } From e006377e64b93d5b777d9c5a4e75eadd308adeba Mon Sep 17 00:00:00 2001 From: Israel Lot Date: Mon, 16 Apr 2018 13:25:56 -0300 Subject: [PATCH 4/8] Fix zero lenght packets bug --- Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs index 794a4ee..e7cfa4c 100644 --- a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs +++ b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs @@ -58,9 +58,9 @@ namespace MQTTnet.Serializer public static byte[] GetRemainingLength(int length) { - if (length == 0) + if (length <= 0) { - return new byte[] { (byte)0 }; + return new [] { (byte)0 }; } var bytes = new byte[4]; From 11ba46d140d0674cee4949eeb6125e777fa08773 Mon Sep 17 00:00:00 2001 From: Israel Lot Date: Mon, 16 Apr 2018 14:01:07 -0300 Subject: [PATCH 5/8] Remove useless stream flush --- .../Adapter/MqttChannelAdapter.cs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index e853d6b..d84a8a1 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -76,7 +76,11 @@ namespace MQTTnet.Adapter { return; } - await _channel.SendStream.WriteAsync(packetData.Array, packetData.Offset, (int)packetData.Count, cancellationToken).ConfigureAwait(false); + await _channel.SendStream.WriteAsync( + packetData.Array, + packetData.Offset, + (int)packetData.Count, + cancellationToken).ConfigureAwait(false); } @@ -85,14 +89,6 @@ namespace MQTTnet.Adapter return; } - if (timeout > TimeSpan.Zero) - { - await _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false); - } - else - { - await _channel.SendStream.FlushAsync(cancellationToken).ConfigureAwait(false); - } }); } From cd09a2f1760a35c32f672797423755d5513be112 Mon Sep 17 00:00:00 2001 From: Israel Lot Date: Mon, 16 Apr 2018 14:08:04 -0300 Subject: [PATCH 6/8] Fix code style --- Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs | 5 +---- .../Exceptions/MqttCommunicationTimedOutException.cs | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index d84a8a1..787231e 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -84,10 +84,7 @@ namespace MQTTnet.Adapter } - if (cancellationToken.IsCancellationRequested) - { - return; - } + }); } diff --git a/Frameworks/MQTTnet.NetStandard/Exceptions/MqttCommunicationTimedOutException.cs b/Frameworks/MQTTnet.NetStandard/Exceptions/MqttCommunicationTimedOutException.cs index 744035e..86f58b3 100644 --- a/Frameworks/MQTTnet.NetStandard/Exceptions/MqttCommunicationTimedOutException.cs +++ b/Frameworks/MQTTnet.NetStandard/Exceptions/MqttCommunicationTimedOutException.cs @@ -4,7 +4,7 @@ namespace MQTTnet.Exceptions { public sealed class MqttCommunicationTimedOutException : MqttCommunicationException { - public MqttCommunicationTimedOutException() : base() { } + public MqttCommunicationTimedOutException() { } public MqttCommunicationTimedOutException(Exception innerException) : base(innerException) { } } From 53007425435113bdc17f348862d40ba4c53e20c7 Mon Sep 17 00:00:00 2001 From: Israel Lot Date: Mon, 16 Apr 2018 14:30:52 -0300 Subject: [PATCH 7/8] Avoid unnecessary enumeration for every packet --- .../Adapter/IMqttChannelAdapter.cs | 2 +- .../Adapter/MqttChannelAdapter.cs | 54 ++++++++++--------- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/IMqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/IMqttChannelAdapter.cs index 7a5c02a..8f03e65 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/IMqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/IMqttChannelAdapter.cs @@ -15,7 +15,7 @@ namespace MQTTnet.Adapter Task DisconnectAsync(TimeSpan timeout); - Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable packets); + Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, MqttBasePacket[] packets); Task ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken); } diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index 787231e..7fffa16 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -51,40 +51,44 @@ namespace MQTTnet.Adapter return ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout)); } - public Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable packets) + public async Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, MqttBasePacket[] packets) + { + for(var i=0;i { - foreach (var packet in packets) + if (cancellationToken.IsCancellationRequested) { - if (cancellationToken.IsCancellationRequested) - { - return; - } - - if (packet == null) - { - continue; - } - - _logger.Verbose("TX >>> {0} [Timeout={1}]", packet, timeout); + return; + } + - var packetData = PacketSerializer.Serialize(packet); - if (cancellationToken.IsCancellationRequested) - { - return; - } - await _channel.SendStream.WriteAsync( - packetData.Array, - packetData.Offset, - (int)packetData.Count, - cancellationToken).ConfigureAwait(false); + _logger.Verbose("TX >>> {0} [Timeout={1}]", packet, timeout); + var packetData = PacketSerializer.Serialize(packet); + if (cancellationToken.IsCancellationRequested) + { + return; } - - + await _channel.SendStream.WriteAsync( + packetData.Array, + packetData.Offset, + (int)packetData.Count, + cancellationToken).ConfigureAwait(false); }); } From a62c15ad1a096caf16d64e3fe22a6dbd1174d71e Mon Sep 17 00:00:00 2001 From: Israel Lot Date: Mon, 16 Apr 2018 14:46:46 -0300 Subject: [PATCH 8/8] Missing interface implementation --- Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs index a898f76..f564e51 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs @@ -31,7 +31,7 @@ namespace MQTTnet.Core.Tests return Task.FromResult(0); } - public Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable packets) + public Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, MqttBasePacket[] packets) { ThrowIfPartnerIsNull();