From 312773d0549c5bd2b01881de8841ba6c6258d599 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Fri, 22 Dec 2017 12:02:25 +0100 Subject: [PATCH 1/3] Add password protected code signing certificate to enable build process. --- .gitignore | 1 - .../MQTTnet.NetStandard/codeSigningKey.pfx | Bin 0 -> 1764 bytes 2 files changed, 1 deletion(-) create mode 100644 Frameworks/MQTTnet.NetStandard/codeSigningKey.pfx diff --git a/.gitignore b/.gitignore index 1e47707..569e43a 100644 --- a/.gitignore +++ b/.gitignore @@ -292,5 +292,4 @@ Build/nuget.exe *.js *.map -*codeSigningKey.pfx /Tests/MQTTnet.TestApp.NetCore/RetainedMessages.json diff --git a/Frameworks/MQTTnet.NetStandard/codeSigningKey.pfx b/Frameworks/MQTTnet.NetStandard/codeSigningKey.pfx new file mode 100644 index 0000000000000000000000000000000000000000..9374a5dbc11acc2a3a2c507ad3281f755fcbbf45 GIT binary patch literal 1764 zcmZXTcU0477sr1Y{u02E3Rsy6RF6!P5EC4TViG6<5|(8)6bdht4Pn**2~%VhDG)+r zRiF|;Rs(232}_WnBFNAn7@#7v0YUKX&z|>fPfyQ1_qpHuxzByh^T)kd7AS(l5Lgz- z1*NnSO%pfw!BDUw7MO-)fhiD9z_Jic|0p5Kkt{?hgl|D!0LuKYYp*m6Uc`boVOj7x ztTHJ5e+)goABkoX7*ju@%aL$6Y6!`KXGW>@w~Y-`14Vm$%t|J4kDqTZ6P+L4WA}$u z7G|cVzWO=rPM5wfIP-qtyt0lB$q9RZfl;2jH`j5_?H+F9>;~s$TY3gKglNc)7o8NO ze7LbE7vwGTl9VM=d7UKxRL#pd*=>j^2Jg!@OuS4?H+PszXH25Hvpu&B?YF0YC?2ia z+c&>@k2C6vei%8?-AgF9bj+B4I@jH@=B!s{pvnx1wN+0Re(2V7_ge?B>q zdHFIZ!gba3v|lxO;k{Ktous{`Tpu|>`6?-G4*zzsWxE>Vm%$<#P%2#|JnccIM@Gqd zu%^q)oXAITT6lZCy11?0+W2A<2)kju-CK_iCw7+hx$(4#6%wtm4UD_1$L&-yeOC1D zmu3aaXob)(G#136n~%L@2{jtY#{_RJg60lcTq+bM*J^Wp1$|~~>_y@66^1u&x1eXS zRL|PzXw*-N3Vxa7wlGgKJJiPOnwGfS+>U|9#YTA=tch(b{C>^w`GnH-F`6bV>NAP6 z(cu++Wu*^^YW(6UY~b)Kfu>97t4@NW-IbZWvAa&ROV=VtB^)iEv1e!4sc#)eUr1Nv z5v%-6b!#dA#_&$CS7%yR^_&pYgk7;{oG&UcmcgTazem_<<^YeCsRNmfSkStrT?e zS||EX*T5ariA5od^Pu_9SyT4-4idp#zIeEDdB8tyZZL-GpsZyp{I?z5?=WEz_0kH0xwYk_S$89R1TJ>r>J#A#5eptXD0belP2bQGR8iAa&O zHyoOzGlQiC{?4rp>Up{Euzr-RTR=2uUg|4vUU{gha4rG~C3YjItMBMNlp`7{q0 zWZ5(p98}3`bh?R*UvIrI7WnzB8Ykd<@d5MU)`l~za(|88==QO@{Kfm78_oTOp>}l2 zYgtdWF#dj^N|);q&cXzZ{ktJGdv$DwlyQ=;!>1b8+Pv0%5*l!MiRnRK6;yIwM5w=E z!aGh-ZaCyO2JWn-^J7$tVwc2uk2>|M6PFrc_-77iW?)aEi&C&)Nho#UYYz4-_6QaQ z?$O?lfGZ;9Ruv;2rrE~ZwaktU*j$VFvWM4^kCgM2S4msW`lZOXqIU#UV`58>!D@UO G;6DJc77}Ox literal 0 HcmV?d00001 From e1a9aa7d03911472a76f38e7b1cba199871d92cd Mon Sep 17 00:00:00 2001 From: kpreisser Date: Wed, 17 Jan 2018 11:10:39 +0100 Subject: [PATCH 2/3] Adjust the MqttClientPendingMessagesQueue and the MqttPacketReader to use asynchronous methods for long-running/blocking operations. Otherwise, Worker Threads from the .NET Threadpool are blocked while waiting for the next client message, and while waiting for the next message to send. --- .../Adapter/MqttChannelAdapter.cs | 2 +- .../Serializer/MqttPacketReader.cs | 23 +++++++++++-------- .../Server/MqttClientPendingMessagesQueue.cs | 16 +++++++++---- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index 0cbcc93..491655d 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -120,7 +120,7 @@ namespace MQTTnet.Adapter private static async Task ReceiveAsync(Stream stream, CancellationToken cancellationToken) { - var header = MqttPacketReader.ReadHeaderFromSource(stream, cancellationToken); + var header = await MqttPacketReader.ReadHeaderFromSourceAsync(stream, cancellationToken); if (header == null) { return null; diff --git a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs index a22707c..ef07848 100644 --- a/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs +++ b/Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs @@ -22,17 +22,18 @@ namespace MQTTnet.Serializer public bool EndOfRemainingData => BaseStream.Position == _header.BodyLength; - public static MqttPacketHeader ReadHeaderFromSource(Stream stream, CancellationToken cancellationToken) + public static async Task ReadHeaderFromSourceAsync(Stream stream, CancellationToken cancellationToken) { - var buffer = stream.ReadByte(); - if (buffer == -1) + byte[] singleByteBuf = new byte[1]; + var readCount = await stream.ReadAsync(singleByteBuf, 0, singleByteBuf.Length).ConfigureAwait(false); + if (readCount <= 0) { return null; } - - var fixedHeader = (byte)buffer; + + var fixedHeader = singleByteBuf[0]; var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4); - var bodyLength = ReadBodyLengthFromSource(stream, cancellationToken); + var bodyLength = await ReadBodyLengthFromSourceAsync(stream, cancellationToken).ConfigureAwait(false); return new MqttPacketHeader { @@ -80,13 +81,15 @@ namespace MQTTnet.Serializer return ReadBytes(_header.BodyLength - (int)BaseStream.Position); } - private static int ReadBodyLengthFromSource(Stream stream, CancellationToken cancellationToken) + private static async Task ReadBodyLengthFromSourceAsync(Stream stream, CancellationToken cancellationToken) { // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. var multiplier = 1; var value = 0; byte encodedByte; + byte[] singleByteBuf = new byte[1]; + var readBytes = new List(); do { @@ -95,13 +98,13 @@ namespace MQTTnet.Serializer throw new TaskCanceledException(); } - var buffer = stream.ReadByte(); - if (buffer == -1) + int readCount = await stream.ReadAsync(singleByteBuf, 0, singleByteBuf.Length).ConfigureAwait(false); + if (readCount <= 0) { throw new MqttCommunicationException("Connection closed while reading remaining length data."); } - encodedByte = (byte)buffer; + encodedByte = singleByteBuf[0]; readBytes.Add(encodedByte); value += (byte)(encodedByte & 127) * multiplier; diff --git a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs index 2048d52..e092d3b 100644 --- a/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs +++ b/Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs @@ -12,7 +12,8 @@ namespace MQTTnet.Server { public sealed class MqttClientPendingMessagesQueue : IDisposable { - private readonly BlockingCollection _queue = new BlockingCollection(); + private readonly ConcurrentQueue _queue = new ConcurrentQueue(); + private readonly SemaphoreSlim _queueWaitSemaphore = new SemaphoreSlim(0); private readonly IMqttServerOptions _options; private readonly MqttClientSession _session; private readonly IMqttNetLogger _logger; @@ -40,7 +41,8 @@ namespace MQTTnet.Server { if (packet == null) throw new ArgumentNullException(nameof(packet)); - _queue.Add(packet); + _queue.Enqueue(packet); + _queueWaitSemaphore.Release(); _logger.Trace("Enqueued packet (ClientId: {0}).", _session.ClientId); } @@ -67,7 +69,10 @@ namespace MQTTnet.Server MqttBasePacket packet = null; try { - packet = _queue.Take(cancellationToken); + await _queueWaitSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + if (!_queue.TryDequeue(out packet)) { + throw new InvalidOperationException(); // should not happen + } await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false); _logger.Trace("Enqueued packet sent (ClientId: {0}).", _session.ClientId); @@ -95,7 +100,8 @@ namespace MQTTnet.Server if (publishPacket.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) { publishPacket.Dup = true; - _queue.Add(packet, CancellationToken.None); + _queue.Enqueue(packet); + _queueWaitSemaphore.Release(); } } @@ -105,7 +111,7 @@ namespace MQTTnet.Server public void Dispose() { - _queue?.Dispose(); + _queueWaitSemaphore?.Dispose(); } } } From bb697e1bd9c9c2344c0eef6db817b97d8dde14ec Mon Sep 17 00:00:00 2001 From: kpreisser Date: Wed, 17 Jan 2018 11:29:48 +0100 Subject: [PATCH 3/3] Follow-Up: Add missing ConfigureAwait() call. --- Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index 491655d..57b7972 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -120,7 +120,7 @@ namespace MQTTnet.Adapter private static async Task ReceiveAsync(Stream stream, CancellationToken cancellationToken) { - var header = await MqttPacketReader.ReadHeaderFromSourceAsync(stream, cancellationToken); + var header = await MqttPacketReader.ReadHeaderFromSourceAsync(stream, cancellationToken).ConfigureAwait(false); if (header == null) { return null;