From 7240b86d984497fff09da3c6b8942cd2a668b54b Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Mon, 25 Jun 2018 09:25:29 +0200 Subject: [PATCH 1/6] fixed another build issue --- Build/build.ps1 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Build/build.ps1 b/Build/build.ps1 index 0b6f074..e57eb73 100644 --- a/Build/build.ps1 +++ b/Build/build.ps1 @@ -17,7 +17,7 @@ if ($path) { &$msbuild ..\Source\MQTTnet\MQTTnet.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" # Build the ASP.NET Core 2.0 extension - &$msbuild ..\Frameworks\MQTTnet.AspNetCore\MQTTnet.AspNetCore.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" + &$msbuild ..\Source\MQTTnet.AspNetCore\MQTTnet.AspNetCore.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" # Build the RPC extension &$msbuild ..\Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" From 4e9abd41c0680cef72b034d942f5f9f11e947382 Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Mon, 25 Jun 2018 12:31:19 +0200 Subject: [PATCH 2/6] improve error messages --- Source/MQTTnet/Serializer/MqttPacketBodyReader.cs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/Source/MQTTnet/Serializer/MqttPacketBodyReader.cs b/Source/MQTTnet/Serializer/MqttPacketBodyReader.cs index d751c68..2cf37dd 100644 --- a/Source/MQTTnet/Serializer/MqttPacketBodyReader.cs +++ b/Source/MQTTnet/Serializer/MqttPacketBodyReader.cs @@ -20,6 +20,7 @@ namespace MQTTnet.Serializer public byte ReadByte() { + ValidateReceiveBuffer(1); return _buffer[_offset++]; } @@ -30,6 +31,8 @@ namespace MQTTnet.Serializer public ushort ReadUInt16() { + ValidateReceiveBuffer(2); + var msb = _buffer[_offset++]; var lsb = _buffer[_offset++]; @@ -40,12 +43,22 @@ namespace MQTTnet.Serializer { var length = ReadUInt16(); + ValidateReceiveBuffer(length); + var result = new ArraySegment(_buffer, _offset, length); _offset += length; return result; } + private void ValidateReceiveBuffer(ushort length) + { + if (_buffer.Length < _offset + length) + { + throw new ArgumentOutOfRangeException(nameof(_buffer), $"expected at least {_offset + length} bytes but there are only {_buffer.Length} bytes"); + } + } + public string ReadStringWithLengthPrefix() { var buffer = ReadWithLengthPrefix(); From 4e40b2ddf31fd3bdd28a6e0ae3c039c5795dd993 Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Mon, 25 Jun 2018 12:31:47 +0200 Subject: [PATCH 3/6] added test to prove something is broken --- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 67 +++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 7c4faf0..ea258f5 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -8,6 +8,7 @@ using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Implementations; namespace MQTTnet.Core.Tests { @@ -167,6 +168,72 @@ namespace MQTTnet.Core.Tests Assert.AreEqual(1, receivedMessagesCount); } + [TestMethod] + public async Task MqttServer_Publish_MultipleClients() + { + var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); + var receivedMessagesCount = 0; + var locked = new object(); + + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer("localhost") + .Build(); + var clientOptions2 = new MqttClientOptionsBuilder() + .WithTcpServer("localhost") + .Build(); + + try + { + await s.StartAsync(new MqttServerOptions()); + + var c1 = new MqttFactory().CreateMqttClient(); + var c2 = new MqttFactory().CreateMqttClient(); + + await c1.ConnectAsync(clientOptions); + await c2.ConnectAsync(clientOptions2); + + c1.ApplicationMessageReceived += (_, __) => + { + lock (locked) + { + receivedMessagesCount++; + } + }; + + c2.ApplicationMessageReceived += (_, __) => + { + lock (locked) + { + receivedMessagesCount++; + } + }; + + var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); + await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); + await c2.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); + + //await Task.WhenAll(Publish(c1, message), Publish(c2, message)); + await Publish(c1, message); + + await Task.Delay(500); + } + finally + { + await s.StopAsync(); + } + + Assert.AreEqual(2000, receivedMessagesCount); + } + + private static async Task Publish(IMqttClient c1, MqttApplicationMessage message) + { + for (int i = 0; i < 1000; i++) + { + await c1.PublishAsync(message); + } + } + [TestMethod] public async Task MqttServer_RetainedMessagesFlow() { From 7dfb6fdc5242710e100c47f2f474752e1882018d Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Mon, 25 Jun 2018 14:51:46 +0200 Subject: [PATCH 4/6] fixed dup flag serialization and added roundtrip tests --- Source/MQTTnet/Packets/MqttPubAckPacket.cs | 2 +- .../Serializer/MqttPacketSerializer.cs | 2 +- .../MqttPacketSerializerTests.cs | 77 +++++++++++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet/Packets/MqttPubAckPacket.cs b/Source/MQTTnet/Packets/MqttPubAckPacket.cs index 3015f27..5811849 100644 --- a/Source/MQTTnet/Packets/MqttPubAckPacket.cs +++ b/Source/MQTTnet/Packets/MqttPubAckPacket.cs @@ -4,7 +4,7 @@ { public override string ToString() { - return "PubAck"; + return $"PubAck [PacketIdentifier={PacketIdentifier}]"; } } } diff --git a/Source/MQTTnet/Serializer/MqttPacketSerializer.cs b/Source/MQTTnet/Serializer/MqttPacketSerializer.cs index 537513d..f1eb790 100644 --- a/Source/MQTTnet/Serializer/MqttPacketSerializer.cs +++ b/Source/MQTTnet/Serializer/MqttPacketSerializer.cs @@ -190,7 +190,7 @@ namespace MQTTnet.Serializer var retain = (receivedMqttPacket.FixedHeader & 0x1) > 0; var qualityOfServiceLevel = (MqttQualityOfServiceLevel)(receivedMqttPacket.FixedHeader >> 1 & 0x3); - var dup = (receivedMqttPacket.FixedHeader & 0x3) > 0; + var dup = (receivedMqttPacket.FixedHeader & 0x8) > 0; var topic = receivedMqttPacket.Body.ReadStringWithLengthPrefix(); diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index 0a0fa2b..61b69eb 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -199,6 +199,64 @@ namespace MQTTnet.Core.Tests DeserializeAndCompare(p, "Ow4ABUEvQi9DAHtIRUxMTw=="); } + [TestMethod] + public void DeserializeV311_MqttPublishPacket_Qos1() + { + var p = new MqttPublishPacket + { + QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce, + }; + + var p2 = Roundtrip(p); + + Assert.AreEqual(p.QualityOfServiceLevel, p2.QualityOfServiceLevel); + Assert.AreEqual(p.Dup, p2.Dup); + } + + [TestMethod] + public void DeserializeV311_MqttPublishPacket_Qos2() + { + var p = new MqttPublishPacket + { + QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, + PacketIdentifier = 1 + }; + + var p2 = Roundtrip(p); + + Assert.AreEqual(p.QualityOfServiceLevel, p2.QualityOfServiceLevel); + Assert.AreEqual(p.Dup, p2.Dup); + } + + [TestMethod] + public void DeserializeV311_MqttPublishPacket_Qos3() + { + var p = new MqttPublishPacket + { + QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce, + PacketIdentifier = 1 + }; + + var p2 = Roundtrip(p); + + Assert.AreEqual(p.QualityOfServiceLevel, p2.QualityOfServiceLevel); + Assert.AreEqual(p.Dup, p2.Dup); + } + + + [TestMethod] + public void DeserializeV311_MqttPublishPacket_DupFalse() + { + var p = new MqttPublishPacket + { + Dup = false, + }; + + var p2 = Roundtrip(p); + + Assert.AreEqual(p.Dup, p2.Dup); + } + [TestMethod] public void SerializeV311_MqttPubAckPacket() { @@ -430,6 +488,25 @@ namespace MQTTnet.Core.Tests } } + private static T Roundtrip(T packet, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) + where T : MqttBasePacket + { + var serializer = new MqttPacketSerializer { ProtocolVersion = protocolVersion }; + + var buffer1 = serializer.Serialize(packet); + + using (var headerStream = new MemoryStream(Join(buffer1))) + { + var channel = new TestMqttChannel(headerStream); + var header = MqttPacketReader.ReadFixedHeaderAsync(channel, CancellationToken.None).GetAwaiter().GetResult(); + + using (var bodyStream = new MemoryStream(Join(buffer1), (int)headerStream.Position, header.RemainingLength)) + { + return (T)serializer.Deserialize(new ReceivedMqttPacket(header.Flags, new MqttPacketBodyReader(bodyStream.ToArray(), 0))); + } + } + } + private static byte[] Join(params ArraySegment[] chunks) { var buffer = new MemoryStream(); From 5923c7c4085fed0755262fc3b85cb2755a554ae0 Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Mon, 25 Jun 2018 14:52:31 +0200 Subject: [PATCH 5/6] fixed negative length for empty packages --- Source/MQTTnet/Serializer/MqttPacketWriter.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/MQTTnet/Serializer/MqttPacketWriter.cs b/Source/MQTTnet/Serializer/MqttPacketWriter.cs index f6784d4..c7f1bc2 100644 --- a/Source/MQTTnet/Serializer/MqttPacketWriter.cs +++ b/Source/MQTTnet/Serializer/MqttPacketWriter.cs @@ -99,7 +99,7 @@ namespace MQTTnet.Serializer public void Reset() { - Length = 0; + Length = 5; } public void Seek(int offset) From b77798f8853e9fdf2f0c5e7591224a8100b9be22 Mon Sep 17 00:00:00 2001 From: Jan Eggers Date: Mon, 25 Jun 2018 15:31:49 +0200 Subject: [PATCH 6/6] dont use thread static buffers they are overlapping on server due to async io --- Source/MQTTnet/Adapter/MqttChannelAdapter.cs | 6 +++- Source/MQTTnet/Serializer/MqttPacketReader.cs | 35 +++++-------------- .../MQTTnet.Benchmarks/SerializerBenchmark.cs | 4 ++- .../MqttPacketReaderTests.cs | 4 ++- .../MqttPacketSerializerTests.cs | 9 +++-- 5 files changed, 27 insertions(+), 31 deletions(-) diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index 61912ef..0059f20 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -23,6 +23,10 @@ namespace MQTTnet.Adapter private readonly IMqttNetChildLogger _logger; private readonly IMqttChannel _channel; + private readonly byte[] _fixedHeaderBuffer = new byte[2]; + + private readonly byte[] _singleByteBuffer = new byte[1]; + private bool _isDisposed; public MqttChannelAdapter(IMqttChannel channel, IMqttPacketSerializer serializer, IMqttNetChildLogger logger) @@ -163,7 +167,7 @@ namespace MQTTnet.Adapter private async Task ReceiveAsync(IMqttChannel channel, CancellationToken cancellationToken) { - var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(channel, cancellationToken).ConfigureAwait(false); + var fixedHeader = await MqttPacketReader.ReadFixedHeaderAsync(channel, _fixedHeaderBuffer, _singleByteBuffer, cancellationToken).ConfigureAwait(false); try { diff --git a/Source/MQTTnet/Serializer/MqttPacketReader.cs b/Source/MQTTnet/Serializer/MqttPacketReader.cs index 826747c..32a9285 100644 --- a/Source/MQTTnet/Serializer/MqttPacketReader.cs +++ b/Source/MQTTnet/Serializer/MqttPacketReader.cs @@ -9,19 +9,13 @@ namespace MQTTnet.Serializer { public static class MqttPacketReader { - [ThreadStatic] - private static byte[] _fixedHeaderBuffer; - - [ThreadStatic] - private static byte[] _singleByteBuffer; - - public static async Task ReadFixedHeaderAsync(IMqttChannel channel, CancellationToken cancellationToken) + public static async Task ReadFixedHeaderAsync(IMqttChannel channel, byte[] fixedHeaderBuffer, byte[] singleByteBuffer, CancellationToken cancellationToken) { // The MQTT fixed header contains 1 byte of flags and at least 1 byte for the remaining data length. // So in all cases at least 2 bytes must be read for a complete MQTT packet. // async/await is used here because the next packet is received in a couple of minutes so the performance // impact is acceptable according to a useless waiting thread. - var buffer = InitializeFixedHeaderBuffer(); + var buffer = fixedHeaderBuffer; var totalBytesRead = 0; while (totalBytesRead < buffer.Length) @@ -41,12 +35,12 @@ namespace MQTTnet.Serializer { return new MqttFixedHeader(buffer[0], 0); } - - var bodyLength = ReadBodyLength(channel, buffer[1], cancellationToken); + + var bodyLength = ReadBodyLength(channel, buffer[1], singleByteBuffer, cancellationToken); return new MqttFixedHeader(buffer[0], bodyLength); } - private static int ReadBodyLength(IMqttChannel channel, byte initialEncodedByte, CancellationToken cancellationToken) + private static int ReadBodyLength(IMqttChannel channel, byte initialEncodedByte, byte[] singleByteBuffer, CancellationToken cancellationToken) { // Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html. var multiplier = 128; @@ -61,7 +55,7 @@ namespace MQTTnet.Serializer // is too big for reading 1 byte in a row. We expect that the remaining data was sent // directly after the initial bytes. If the client disconnects just in this moment we // will get an exception anyway. - encodedByte = ReadByte(channel, cancellationToken); + encodedByte = ReadByte(channel, singleByteBuffer, cancellationToken); value += (byte)(encodedByte & 127) * multiplier; if (multiplier > 128 * 128 * 128) @@ -75,27 +69,16 @@ namespace MQTTnet.Serializer return value; } - private static byte ReadByte(IMqttChannel channel, CancellationToken cancellationToken) + private static byte ReadByte(IMqttChannel channel, byte[] singleByteBuffer, CancellationToken cancellationToken) { - var buffer = InitializeSingleByteBuffer(); - var readCount = channel.ReadAsync(buffer, 0, 1, cancellationToken).GetAwaiter().GetResult(); + var readCount = channel.ReadAsync(singleByteBuffer, 0, 1, cancellationToken).GetAwaiter().GetResult(); if (readCount <= 0) { cancellationToken.ThrowIfCancellationRequested(); ExceptionHelper.ThrowGracefulSocketClose(); } - return buffer[0]; - } - - private static byte[] InitializeFixedHeaderBuffer() - { - return _fixedHeaderBuffer ?? (_fixedHeaderBuffer = new byte[2]); - } - - private static byte[] InitializeSingleByteBuffer() - { - return _singleByteBuffer ?? (_singleByteBuffer = new byte[1]); + return singleByteBuffer[0]; } } } diff --git a/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs b/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs index 28de60f..8beb7b4 100644 --- a/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/SerializerBenchmark.cs @@ -47,12 +47,14 @@ namespace MQTTnet.Benchmarks public void Deserialize_10000_Messages() { var channel = new BenchmarkMqttChannel(_serializedPacket); + var fixedHeader = new byte[2]; + var singleByteBuffer = new byte[1]; for (var i = 0; i < 10000; i++) { channel.Reset(); - var header = MqttPacketReader.ReadFixedHeaderAsync(channel, CancellationToken.None).GetAwaiter().GetResult(); + var header = MqttPacketReader.ReadFixedHeaderAsync(channel, fixedHeader, singleByteBuffer, CancellationToken.None).GetAwaiter().GetResult(); var receivedPacket = new ReceivedMqttPacket( header.Flags, diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketReaderTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketReaderTests.cs index f983622..39aeeff 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketReaderTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketReaderTests.cs @@ -14,7 +14,9 @@ namespace MQTTnet.Core.Tests [ExpectedException(typeof(MqttCommunicationClosedGracefullyException))] public void MqttPacketReader_EmptyStream() { - MqttPacketReader.ReadFixedHeaderAsync(new TestMqttChannel(new MemoryStream()), CancellationToken.None).GetAwaiter().GetResult(); + var fixedHeader = new byte[2]; + var singleByteBuffer = new byte[1]; + MqttPacketReader.ReadFixedHeaderAsync(new TestMqttChannel(new MemoryStream()), fixedHeader, singleByteBuffer, CancellationToken.None).GetAwaiter().GetResult(); } } } diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index 61b69eb..d8d23e2 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -476,7 +476,9 @@ namespace MQTTnet.Core.Tests using (var headerStream = new MemoryStream(Join(buffer1))) { var channel = new TestMqttChannel(headerStream); - var header = MqttPacketReader.ReadFixedHeaderAsync(channel, CancellationToken.None).GetAwaiter().GetResult(); + var fixedHeader = new byte[2]; + var singleByteBuffer = new byte[1]; + var header = MqttPacketReader.ReadFixedHeaderAsync(channel, fixedHeader, singleByteBuffer, CancellationToken.None).GetAwaiter().GetResult(); using (var bodyStream = new MemoryStream(Join(buffer1), (int)headerStream.Position, header.RemainingLength)) { @@ -498,7 +500,10 @@ namespace MQTTnet.Core.Tests using (var headerStream = new MemoryStream(Join(buffer1))) { var channel = new TestMqttChannel(headerStream); - var header = MqttPacketReader.ReadFixedHeaderAsync(channel, CancellationToken.None).GetAwaiter().GetResult(); + var fixedHeader = new byte[2]; + var singleByteBuffer = new byte[1]; + + var header = MqttPacketReader.ReadFixedHeaderAsync(channel, fixedHeader, singleByteBuffer, CancellationToken.None).GetAwaiter().GetResult(); using (var bodyStream = new MemoryStream(Join(buffer1), (int)headerStream.Position, header.RemainingLength)) {