From caea7910b4b11b36486836fdc6f31a6cb96cd71b Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 16 Jun 2018 14:30:02 +0200 Subject: [PATCH] Refactor message processing and async/await usage. --- Build/MQTTnet.Extensions.ManagedClient.nuspec | 4 +- Build/MQTTnet.Extensions.Rpc.nuspec | 4 +- Build/MQTTnet.nuspec | 4 +- Source/MQTTnet/Adapter/IMqttChannelAdapter.cs | 2 +- Source/MQTTnet/Adapter/MqttChannelAdapter.cs | 160 +++++++---- Source/MQTTnet/Client/MqttClient.cs | 38 ++- .../TargetFrameworkInfoProvider.cs | 2 + .../MQTTnet/Implementations/MqttTcpChannel.cs | 2 +- .../Implementations/MqttTcpServerAdapter.cs | 2 +- .../Implementations/MqttTcpServerListener.cs | 7 +- Source/MQTTnet/Internal/TaskExtensions.cs | 4 +- Source/MQTTnet/MQTTnet.csproj | 5 +- Source/MQTTnet/Serializer/ByteReader.cs | 48 ---- Source/MQTTnet/Serializer/ByteWriter.cs | 36 --- Source/MQTTnet/Serializer/MqttPacketReader.cs | 12 +- .../Serializer/MqttPacketSerializer.cs | 222 ++++++++------- Source/MQTTnet/Serializer/MqttPacketWriter.cs | 128 +++++++-- ...ue.cs => MqttClientPendingPacketsQueue.cs} | 8 +- Source/MQTTnet/Server/MqttClientSession.cs | 41 ++- .../Server/MqttClientSessionsManager.cs | 262 ++++++++++-------- .../Server/MqttEnqueuedApplicationMessage.cs | 15 + Source/MQTTnet/Server/MqttServer.cs | 11 +- ...esult.cs => PrepareClientSessionResult.cs} | 2 +- .../ChannelAdapterBenchmark.cs | 2 +- Tests/MQTTnet.Core.Tests/ByteReaderTests.cs | 30 -- Tests/MQTTnet.Core.Tests/ByteWriterTests.cs | 51 ---- Tests/MQTTnet.Core.Tests/ExtensionTests.cs | 12 +- .../MqttPacketReaderTests.cs | 2 +- .../TestMqttCommunicationAdapter.cs | 3 +- .../PerformanceTest.cs | 98 +++---- Tests/MQTTnet.TestApp.NetCore/Program.cs | 3 +- 31 files changed, 586 insertions(+), 634 deletions(-) delete mode 100644 Source/MQTTnet/Serializer/ByteReader.cs delete mode 100644 Source/MQTTnet/Serializer/ByteWriter.cs rename Source/MQTTnet/Server/{MqttClientPendingMessagesQueue.cs => MqttClientPendingPacketsQueue.cs} (93%) create mode 100644 Source/MQTTnet/Server/MqttEnqueuedApplicationMessage.cs rename Source/MQTTnet/Server/{GetOrCreateClientSessionResult.cs => PrepareClientSessionResult.cs} (76%) delete mode 100644 Tests/MQTTnet.Core.Tests/ByteReaderTests.cs delete mode 100644 Tests/MQTTnet.Core.Tests/ByteWriterTests.cs diff --git a/Build/MQTTnet.Extensions.ManagedClient.nuspec b/Build/MQTTnet.Extensions.ManagedClient.nuspec index e681554..714ed9e 100644 --- a/Build/MQTTnet.Extensions.ManagedClient.nuspec +++ b/Build/MQTTnet.Extensions.ManagedClient.nuspec @@ -48,6 +48,8 @@ - + + + \ No newline at end of file diff --git a/Build/MQTTnet.Extensions.Rpc.nuspec b/Build/MQTTnet.Extensions.Rpc.nuspec index f8667c4..16a51c2 100644 --- a/Build/MQTTnet.Extensions.Rpc.nuspec +++ b/Build/MQTTnet.Extensions.Rpc.nuspec @@ -48,6 +48,8 @@ - + + + \ No newline at end of file diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 25c31fb..5d58437 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -69,6 +69,8 @@ - + + + \ No newline at end of file diff --git a/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs b/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs index e026e7b..43e5c8c 100644 --- a/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs @@ -20,7 +20,7 @@ namespace MQTTnet.Adapter Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken); - Task SendPacketAsync(TimeSpan timeout, MqttBasePacket packet, CancellationToken cancellationToken); + Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken); Task ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken); } diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index 4fc681a..7db9a2a 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -40,52 +40,84 @@ namespace MQTTnet.Adapter public event EventHandler ReadingPacketStarted; public event EventHandler ReadingPacketCompleted; - public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) + public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) { ThrowIfDisposed(); - _logger.Verbose("Connecting [Timeout={0}]", timeout); - return ExecuteAndWrapExceptionAsync(() => - Internal.TaskExtensions.TimeoutAfter(ct => _channel.ConnectAsync(ct), timeout, cancellationToken)); + try + { + _logger.Verbose("Connecting [Timeout={0}]", timeout); + + await Internal.TaskExtensions + .TimeoutAfterAsync(ct => _channel.ConnectAsync(ct), timeout, cancellationToken) + .ConfigureAwait(false); + } + catch (Exception exception) + { + if (IsWrappedException(exception)) + { + throw; + } + + WrapException(exception); + } } - public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken) + public async Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken) { ThrowIfDisposed(); - _logger.Verbose("Disconnecting [Timeout={0}]", timeout); - return ExecuteAndWrapExceptionAsync(() => - Internal.TaskExtensions.TimeoutAfter(ct => _channel.DisconnectAsync(), timeout, cancellationToken)); + try + { + _logger.Verbose("Disconnecting [Timeout={0}]", timeout); + + await Internal.TaskExtensions + .TimeoutAfterAsync(ct => _channel.DisconnectAsync(), timeout, cancellationToken) + .ConfigureAwait(false); + } + catch (Exception exception) + { + if (IsWrappedException(exception)) + { + throw; + } + + WrapException(exception); + } } - public Task SendPacketAsync(TimeSpan timeout, MqttBasePacket packet, CancellationToken cancellationToken) + public async Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) { - return ExecuteAndWrapExceptionAsync(() => + try { - _logger.Verbose("TX >>> {0} [Timeout={1}]", packet, timeout); + _logger.Verbose("TX >>> {0}", packet); var packetData = PacketSerializer.Serialize(packet); - return Internal.TaskExtensions.TimeoutAfter(ct => _channel.WriteAsync( - packetData.Array, - packetData.Offset, - packetData.Count, - ct), timeout, cancellationToken); - }); + await _channel.WriteAsync(packetData.Array, packetData.Offset, packetData.Count, cancellationToken).ConfigureAwait(false); + } + catch (Exception exception) + { + if (IsWrappedException(exception)) + { + throw; + } + + WrapException(exception); + } } public async Task ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken) { ThrowIfDisposed(); - MqttBasePacket packet = null; - await ExecuteAndWrapExceptionAsync(async () => + try { ReceivedMqttPacket receivedMqttPacket; if (timeout > TimeSpan.Zero) { - receivedMqttPacket = await Internal.TaskExtensions.TimeoutAfter(ct => ReceiveAsync(_channel, ct), timeout, cancellationToken).ConfigureAwait(false); + receivedMqttPacket = await Internal.TaskExtensions.TimeoutAfterAsync(ct => ReceiveAsync(_channel, ct), timeout, cancellationToken).ConfigureAwait(false); } else { @@ -94,19 +126,30 @@ namespace MQTTnet.Adapter if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested) { - return; + return null; } - packet = PacketSerializer.Deserialize(receivedMqttPacket); + var packet = PacketSerializer.Deserialize(receivedMqttPacket); if (packet == null) { throw new MqttProtocolViolationException("Received malformed packet."); } _logger.Verbose("RX <<< {0}", packet); - }).ConfigureAwait(false); + + return packet; + } + catch (Exception exception) + { + if (IsWrappedException(exception)) + { + throw; + } + + WrapException(exception); + } - return packet; + return null; } private async Task ReceiveAsync(IMqttChannel channel, CancellationToken cancellationToken) @@ -138,7 +181,9 @@ namespace MQTTnet.Adapter chunkSize = bytesLeft; } - var readBytes = await channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken) .ConfigureAwait(false); + // async/await is not used to avoid the overhead of context switches. We assume that the reamining data + // has been sent from the sender directly after the initial bytes. + var readBytes = channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).GetAwaiter().GetResult(); if (readBytes <= 0) { ExceptionHelper.ThrowGracefulSocketClose(); @@ -155,42 +200,6 @@ namespace MQTTnet.Adapter } } - private static async Task ExecuteAndWrapExceptionAsync(Func action) - { - try - { - await action().ConfigureAwait(false); - } - catch (Exception exception) - { - if (exception is TaskCanceledException || - exception is OperationCanceledException || - exception is MqttCommunicationTimedOutException || - exception is MqttCommunicationException) - { - throw; - } - - if (exception is IOException && exception.InnerException is SocketException socketException) - { - if (socketException.SocketErrorCode == SocketError.ConnectionAborted) - { - throw new OperationCanceledException(); - } - } - - if (exception is COMException comException) - { - if ((uint)comException.HResult == ErrorOperationAborted) - { - throw new OperationCanceledException(); - } - } - - throw new MqttCommunicationException(exception); - } - } - public void Dispose() { _isDisposed = true; @@ -205,5 +214,34 @@ namespace MQTTnet.Adapter throw new ObjectDisposedException(nameof(MqttChannelAdapter)); } } + + private static bool IsWrappedException(Exception exception) + { + return exception is TaskCanceledException || + exception is OperationCanceledException || + exception is MqttCommunicationTimedOutException || + exception is MqttCommunicationException; + } + + private static void WrapException(Exception exception) + { + if (exception is IOException && exception.InnerException is SocketException socketException) + { + if (socketException.SocketErrorCode == SocketError.ConnectionAborted) + { + throw new OperationCanceledException(); + } + } + + if (exception is COMException comException) + { + if ((uint)comException.HResult == ErrorOperationAborted) + { + throw new OperationCanceledException(); + } + } + + throw new MqttCommunicationException(exception); + } } } diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 79a0ecf..e9362f6 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -17,7 +17,7 @@ namespace MQTTnet.Client { private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); private readonly Stopwatch _sendTracker = new Stopwatch(); - private readonly SemaphoreSlim _disconnectLock = new SemaphoreSlim(1, 1); + private readonly object _disconnectLock = new object(); private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); private readonly IMqttClientAdapterFactory _adapterFactory; @@ -215,7 +215,7 @@ namespace MQTTnet.Client private async Task DisconnectInternalAsync(Task sender, Exception exception) { - await InitiateDisconnectAsync().ConfigureAwait(false); + InitiateDisconnect(); var clientWasConnected = IsConnected; IsConnected = false; @@ -249,25 +249,23 @@ namespace MQTTnet.Client } } - private async Task InitiateDisconnectAsync() + private void InitiateDisconnect() { - await _disconnectLock.WaitAsync().ConfigureAwait(false); - try + lock (_disconnectLock) { - if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) + try { - return; - } + if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) + { + return; + } - _cancellationTokenSource.Cancel(false); - } - catch (Exception adapterException) - { - _logger.Warning(adapterException, "Error while initiating disconnect."); - } - finally - { - _disconnectLock.Release(); + _cancellationTokenSource.Cancel(false); + } + catch (Exception adapterException) + { + _logger.Warning(adapterException, "Error while initiating disconnect."); + } } } @@ -279,7 +277,7 @@ namespace MQTTnet.Client } _sendTracker.Restart(); - return _adapter.SendPacketAsync(_options.CommunicationTimeout, packet, cancellationToken); + return _adapter.SendPacketAsync(packet, cancellationToken); } private async Task SendAndReceiveAsync(MqttBasePacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttBasePacket @@ -300,8 +298,8 @@ namespace MQTTnet.Client var packetAwaiter = _packetDispatcher.AddPacketAwaiter(identifier); try { - await _adapter.SendPacketAsync(_options.CommunicationTimeout, requestPacket, cancellationToken).ConfigureAwait(false); - var respone = await Internal.TaskExtensions.TimeoutAfter(ct => packetAwaiter.Task, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); + await _adapter.SendPacketAsync(requestPacket, cancellationToken).ConfigureAwait(false); + var respone = await Internal.TaskExtensions.TimeoutAfterAsync(ct => packetAwaiter.Task, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false); return (TResponsePacket)respone; } diff --git a/Source/MQTTnet/Diagnostics/TargetFrameworkInfoProvider.cs b/Source/MQTTnet/Diagnostics/TargetFrameworkInfoProvider.cs index 5258276..efbf08b 100644 --- a/Source/MQTTnet/Diagnostics/TargetFrameworkInfoProvider.cs +++ b/Source/MQTTnet/Diagnostics/TargetFrameworkInfoProvider.cs @@ -10,6 +10,8 @@ return "net452"; #elif NET461 return "net461"; +#elif NET472 + return "net472"; #elif NETSTANDARD1_3 return "netstandard1.3"; #elif NETSTANDARD2_0 diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.cs index 20829be..c48a06f 100644 --- a/Source/MQTTnet/Implementations/MqttTcpChannel.cs +++ b/Source/MQTTnet/Implementations/MqttTcpChannel.cs @@ -1,4 +1,4 @@ -#if NET452 || NET461 || NETSTANDARD1_3 || NETSTANDARD2_0 +#if !WINDOWS_UWP using System; using System.Net.Security; using System.Net.Sockets; diff --git a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs index 8783e9c..eeefb34 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs @@ -1,4 +1,4 @@ -#if NET452 || NET461 || NETSTANDARD1_3 || NETSTANDARD2_0 +#if !WINDOWS_UWP using System; using System.Collections.Generic; using System.Net.Sockets; diff --git a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs index fccd77f..b77d9b3 100644 --- a/Source/MQTTnet/Implementations/MqttTcpServerListener.cs +++ b/Source/MQTTnet/Implementations/MqttTcpServerListener.cs @@ -1,4 +1,4 @@ -#if NET452 || NET461 || NETSTANDARD1_3 || NETSTANDARD2_0 +#if !WINDOWS_UWP using System; using System.Net; using System.Net.Security; @@ -76,7 +76,8 @@ namespace MQTTnet.Implementations await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); } - _logger.Verbose($"Client '{clientSocket.RemoteEndPoint}' accepted by TCP listener '{_socket.LocalEndPoint}, {_addressFamily}'."); + var protocol = _addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6"; + _logger.Verbose($"Client '{clientSocket.RemoteEndPoint}' accepted by TCP listener '{_socket.LocalEndPoint}, {protocol}'."); var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer(), _logger); ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter)); @@ -102,7 +103,7 @@ namespace MQTTnet.Implementations { _socket?.Dispose(); -#if NETSTANDARD1_3 || NETSTANDARD2_0 || NET461 +#if NETSTANDARD1_3 || NETSTANDARD2_0 || NET461 || NET472 _tlsCertificate?.Dispose(); #endif } diff --git a/Source/MQTTnet/Internal/TaskExtensions.cs b/Source/MQTTnet/Internal/TaskExtensions.cs index 288ac0b..1356d97 100644 --- a/Source/MQTTnet/Internal/TaskExtensions.cs +++ b/Source/MQTTnet/Internal/TaskExtensions.cs @@ -7,7 +7,7 @@ namespace MQTTnet.Internal { public static class TaskExtensions { - public static async Task TimeoutAfter(Func action, TimeSpan timeout, CancellationToken cancellationToken) + public static async Task TimeoutAfterAsync(Func action, TimeSpan timeout, CancellationToken cancellationToken) { if (action == null) throw new ArgumentNullException(nameof(action)); @@ -31,7 +31,7 @@ namespace MQTTnet.Internal } } - public static async Task TimeoutAfter(Func> action, TimeSpan timeout, CancellationToken cancellationToken) + public static async Task TimeoutAfterAsync(Func> action, TimeSpan timeout, CancellationToken cancellationToken) { if (action == null) throw new ArgumentNullException(nameof(action)); diff --git a/Source/MQTTnet/MQTTnet.csproj b/Source/MQTTnet/MQTTnet.csproj index 8364378..4afd09d 100644 --- a/Source/MQTTnet/MQTTnet.csproj +++ b/Source/MQTTnet/MQTTnet.csproj @@ -1,7 +1,7 @@  - netstandard1.3;netstandard2.0;net452;net461;uap10.0 + netstandard1.3;netstandard2.0;net452;uap10.0 netstandard1.3;netstandard2.0 MQTTnet MQTTnet @@ -62,7 +62,4 @@ - - - \ No newline at end of file diff --git a/Source/MQTTnet/Serializer/ByteReader.cs b/Source/MQTTnet/Serializer/ByteReader.cs deleted file mode 100644 index 461bba3..0000000 --- a/Source/MQTTnet/Serializer/ByteReader.cs +++ /dev/null @@ -1,48 +0,0 @@ -using System; - -namespace MQTTnet.Serializer -{ - public class ByteReader - { - private readonly int _source; - private int _index; - - public ByteReader(int source) - { - _source = source; - } - - public bool Read() - { - if (_index >= 8) - { - throw new InvalidOperationException("End of byte reached."); - } - - var result = ((1 << _index) & _source) > 0; - _index++; - return result; - } - - public int Read(int count) - { - if (_index + count > 8) - { - throw new InvalidOperationException("End of byte will be reached."); - } - - var result = 0; - for (var i = 0; i < count; i++) - { - if (((1 << _index) & _source) > 0) - { - result |= 1 << i; - } - - _index++; - } - - return result; - } - } -} diff --git a/Source/MQTTnet/Serializer/ByteWriter.cs b/Source/MQTTnet/Serializer/ByteWriter.cs deleted file mode 100644 index 9ae2156..0000000 --- a/Source/MQTTnet/Serializer/ByteWriter.cs +++ /dev/null @@ -1,36 +0,0 @@ -using System; - -namespace MQTTnet.Serializer -{ - public class ByteWriter - { - private int _index; - private int _byte; - - public byte Value => (byte)_byte; - - public void Write(int @byte, int count) - { - for (var i = 0; i < count; i++) - { - var value = ((1 << i) & @byte) > 0; - Write(value); - } - } - - public void Write(bool bit) - { - if (_index >= 8) - { - throw new InvalidOperationException("End of the byte reached."); - } - - if (bit) - { - _byte |= 1 << _index; - } - - _index++; - } - } -} diff --git a/Source/MQTTnet/Serializer/MqttPacketReader.cs b/Source/MQTTnet/Serializer/MqttPacketReader.cs index 7ed918f..d50fae5 100644 --- a/Source/MQTTnet/Serializer/MqttPacketReader.cs +++ b/Source/MQTTnet/Serializer/MqttPacketReader.cs @@ -12,6 +12,8 @@ namespace MQTTnet.Serializer { // The MQTT fixed header contains 1 byte of flags and at least 1 byte for the remaining data length. // So in all cases at least 2 bytes must be read for a complete MQTT packet. + // async/await is used here because the next packet is received in a couple of minutes so the performance + // impact is acceptable according to a useless waiting thread. var buffer = new byte[2]; var totalBytesRead = 0; @@ -37,11 +39,11 @@ namespace MQTTnet.Serializer return new MqttFixedHeader(buffer[0], 0); } - var bodyLength = await ReadBodyLengthAsync(channel, buffer[1], cancellationToken).ConfigureAwait(false); + var bodyLength = ReadBodyLength(channel, buffer[1], cancellationToken); return new MqttFixedHeader(buffer[0], bodyLength); } - private static async Task ReadBodyLengthAsync(IMqttChannel channel, byte initialEncodedByte, CancellationToken cancellationToken) + private static int ReadBodyLength(IMqttChannel channel, byte initialEncodedByte, CancellationToken cancellationToken) { // Alorithm taken from https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html. var multiplier = 128; @@ -50,7 +52,11 @@ namespace MQTTnet.Serializer while ((encodedByte & 128) != 0) { - encodedByte = await ReadByteAsync(channel, cancellationToken).ConfigureAwait(false); + // Here the async/await pattern is not used becuase the overhead of context switches + // is too big for reading 1 byte in a row. We expect that the remaining data was sent + // directly after the initial bytes. If the client disconnects just in this moment we + // will get an exception anyway. + encodedByte = ReadByteAsync(channel, cancellationToken).GetAwaiter().GetResult(); value += (byte)(encodedByte & 127) * multiplier; if (multiplier > 128 * 128 * 128) diff --git a/Source/MQTTnet/Serializer/MqttPacketSerializer.cs b/Source/MQTTnet/Serializer/MqttPacketSerializer.cs index 2acbfbb..811acf7 100644 --- a/Source/MQTTnet/Serializer/MqttPacketSerializer.cs +++ b/Source/MQTTnet/Serializer/MqttPacketSerializer.cs @@ -2,7 +2,6 @@ using MQTTnet.Packets; using MQTTnet.Protocol; using System; -using System.IO; using System.Linq; using MQTTnet.Adapter; @@ -18,57 +17,46 @@ namespace MQTTnet.Serializer { if (packet == null) throw new ArgumentNullException(nameof(packet)); - using (var stream = new MemoryStream(128)) - { - // Leave enough head space for max header size (fixed + 4 variable remaining length = 5 bytes) - stream.Seek(5, SeekOrigin.Begin); + var packetWriter = new MqttPacketWriter(); - var fixedHeader = SerializePacket(packet, stream); - var remainingLength = (int)stream.Length - 5; + // Leave enough head space for max header size (fixed + 4 variable remaining length = 5 bytes) + packetWriter.Seek(5); - var remainingLengthBuffer = MqttPacketWriter.EncodeRemainingLength(remainingLength); + var fixedHeader = SerializePacket(packet, packetWriter); + var remainingLength = packetWriter.Length - 5; - var headerSize = FixedHeaderSize + remainingLengthBuffer.Count; - var headerOffset = 5 - headerSize; + var remainingLengthBuffer = MqttPacketWriter.EncodeRemainingLength(remainingLength); - // Position cursor on correct offset on beginining of array (has leading 0x0) - stream.Seek(headerOffset, SeekOrigin.Begin); - stream.WriteByte(fixedHeader); - stream.Write(remainingLengthBuffer.Array, remainingLengthBuffer.Offset, remainingLengthBuffer.Count); + var headerSize = FixedHeaderSize + remainingLengthBuffer.Count; + var headerOffset = 5 - headerSize; -#if NET461 || NET452 || NETSTANDARD2_0 - var buffer = stream.GetBuffer(); - return new ArraySegment(buffer, headerOffset, (int)stream.Length - headerOffset); -#else - if (stream.TryGetBuffer(out var segment)) - { - return new ArraySegment(segment.Array, headerOffset, segment.Count - headerOffset); - } + // Position cursor on correct offset on beginining of array (has leading 0x0) + packetWriter.Seek(headerOffset); + packetWriter.Write(fixedHeader); + packetWriter.Write(remainingLengthBuffer.Array, remainingLengthBuffer.Offset, remainingLengthBuffer.Count); - var buffer = stream.ToArray(); - return new ArraySegment(buffer, headerOffset, buffer.Length - headerOffset); -#endif - } + var buffer = packetWriter.GetBuffer(); + return new ArraySegment(buffer, headerOffset, packetWriter.Length - headerOffset); } - private byte SerializePacket(MqttBasePacket packet, Stream stream) + private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter packetWriter) { switch (packet) { - case MqttConnectPacket connectPacket: return Serialize(connectPacket, stream); - case MqttConnAckPacket connAckPacket: return Serialize(connAckPacket, stream); + case MqttConnectPacket connectPacket: return Serialize(connectPacket, packetWriter); + case MqttConnAckPacket connAckPacket: return Serialize(connAckPacket, packetWriter); case MqttDisconnectPacket _: return SerializeEmptyPacket(MqttControlPacketType.Disconnect); case MqttPingReqPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingReq); case MqttPingRespPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingResp); - case MqttPublishPacket publishPacket: return Serialize(publishPacket, stream); - case MqttPubAckPacket pubAckPacket: return Serialize(pubAckPacket, stream); - case MqttPubRecPacket pubRecPacket: return Serialize(pubRecPacket, stream); - case MqttPubRelPacket pubRelPacket: return Serialize(pubRelPacket, stream); - case MqttPubCompPacket pubCompPacket: return Serialize(pubCompPacket, stream); - case MqttSubscribePacket subscribePacket: return Serialize(subscribePacket, stream); - case MqttSubAckPacket subAckPacket: return Serialize(subAckPacket, stream); - case MqttUnsubscribePacket unsubscribePacket: return Serialize(unsubscribePacket, stream); - case MqttUnsubAckPacket unsubAckPacket: return Serialize(unsubAckPacket, stream); + case MqttPublishPacket publishPacket: return Serialize(publishPacket, packetWriter); + case MqttPubAckPacket pubAckPacket: return Serialize(pubAckPacket, packetWriter); + case MqttPubRecPacket pubRecPacket: return Serialize(pubRecPacket, packetWriter); + case MqttPubRelPacket pubRelPacket: return Serialize(pubRelPacket, packetWriter); + case MqttPubCompPacket pubCompPacket: return Serialize(pubCompPacket, packetWriter); + case MqttSubscribePacket subscribePacket: return Serialize(subscribePacket, packetWriter); + case MqttSubAckPacket subAckPacket: return Serialize(subAckPacket, packetWriter); + case MqttUnsubscribePacket unsubscribePacket: return Serialize(unsubscribePacket, packetWriter); + case MqttUnsubAckPacket unsubAckPacket: return Serialize(unsubAckPacket, packetWriter); default: throw new MqttProtocolViolationException("Packet type invalid."); } } @@ -195,10 +183,9 @@ namespace MQTTnet.Serializer var body = receivedMqttPacket.Body; ThrowIfBodyIsEmpty(body); - var fixedHeader = new ByteReader(receivedMqttPacket.FixedHeader); - var retain = fixedHeader.Read(); - var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2); - var dup = fixedHeader.Read(); + var retain = (receivedMqttPacket.FixedHeader & 0x1) > 0; + var qualityOfServiceLevel = (MqttQualityOfServiceLevel)(receivedMqttPacket.FixedHeader >> 1 & 0x3); + var dup = (receivedMqttPacket.FixedHeader & 0x3) > 0; var topic = body.ReadStringWithLengthPrefix(); @@ -253,8 +240,8 @@ namespace MQTTnet.Serializer throw new MqttProtocolViolationException($"Protocol name ({protocolName}) is not supported."); } - var connectFlags = new ByteReader(body.ReadByte()); - if (connectFlags.Read()) + var connectFlags = body.ReadByte(); + if ((connectFlags & 0x1) > 0) { throw new MqttProtocolViolationException("The first bit of the Connect Flags must be set to 0."); } @@ -262,14 +249,14 @@ namespace MQTTnet.Serializer var packet = new MqttConnectPacket { ProtocolVersion = protocolVersion, - CleanSession = connectFlags.Read() + CleanSession = (connectFlags & 0x2) > 0 }; - var willFlag = connectFlags.Read(); - var willQoS = connectFlags.Read(2); - var willRetain = connectFlags.Read(); - var passwordFlag = connectFlags.Read(); - var usernameFlag = connectFlags.Read(); + var willFlag = (connectFlags & 0x4) > 0; + var willQoS = (connectFlags & 0x18) >> 3; + var willRetain = (connectFlags & 0x20) > 0; + var passwordFlag = (connectFlags & 0x40) > 0; + var usernameFlag = (connectFlags & 0x80) > 0; packet.KeepAlivePeriod = body.ReadUInt16(); packet.ClientId = body.ReadStringWithLengthPrefix(); @@ -322,11 +309,11 @@ namespace MQTTnet.Serializer var packet = new MqttConnAckPacket(); - var firstByteReader = new ByteReader(body.ReadByte()); - + var acknowledgeFlags = body.ReadByte(); + if (ProtocolVersion == MqttProtocolVersion.V311) { - packet.IsSessionPresent = firstByteReader.Read(); + packet.IsSessionPresent = (acknowledgeFlags & 0x1) > 0; } packet.ConnectReturnCode = (MqttConnectReturnCode)body.ReadByte(); @@ -344,119 +331,129 @@ namespace MQTTnet.Serializer } } + // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local private static void ValidatePublishPacket(MqttPublishPacket packet) { - if (packet == null) throw new ArgumentNullException(nameof(packet)); - if (packet.QualityOfServiceLevel == 0 && packet.Dup) { throw new MqttProtocolViolationException("Dup flag must be false for QoS 0 packets [MQTT-3.3.1-2]."); } } - private byte Serialize(MqttConnectPacket packet, Stream stream) + private byte Serialize(MqttConnectPacket packet, MqttPacketWriter packetWriter) { ValidateConnectPacket(packet); // Write variable header if (ProtocolVersion == MqttProtocolVersion.V311) { - stream.WriteWithLengthPrefix("MQTT"); - stream.WriteByte(4); // 3.1.2.2 Protocol Level 4 + packetWriter.WriteWithLengthPrefix("MQTT"); + packetWriter.Write(4); // 3.1.2.2 Protocol Level 4 } else { - stream.WriteWithLengthPrefix("MQIsdp"); - stream.WriteByte(3); // Protocol Level 3 + packetWriter.WriteWithLengthPrefix("MQIsdp"); + packetWriter.Write(3); // Protocol Level 3 } - var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags - connectFlags.Write(false); // Reserved - connectFlags.Write(packet.CleanSession); - connectFlags.Write(packet.WillMessage != null); - - if (packet.WillMessage != null) + byte connectFlags = 0x0; + if (packet.CleanSession) { - connectFlags.Write((int)packet.WillMessage.QualityOfServiceLevel, 2); - connectFlags.Write(packet.WillMessage.Retain); + connectFlags |= 0x2; } - else + + if (packet.WillMessage != null) { - connectFlags.Write(0, 2); - connectFlags.Write(false); - } + connectFlags |= 0x4; + connectFlags |= (byte)((byte)packet.WillMessage.QualityOfServiceLevel << 3); + if (packet.WillMessage.Retain) + { + connectFlags |= 0x20; + } + } + if (packet.Password != null && packet.Username == null) { throw new MqttProtocolViolationException("If the User Name Flag is set to 0, the Password Flag MUST be set to 0 [MQTT-3.1.2-22]."); } - connectFlags.Write(packet.Password != null); - connectFlags.Write(packet.Username != null); + if (packet.Password != null) + { + connectFlags |= 0x40; + } - stream.Write(connectFlags); - stream.Write(packet.KeepAlivePeriod); - stream.WriteWithLengthPrefix(packet.ClientId); + if (packet.Username != null) + { + connectFlags |= 0x80; + } + + packetWriter.Write(connectFlags); + packetWriter.Write(packet.KeepAlivePeriod); + packetWriter.WriteWithLengthPrefix(packet.ClientId); if (packet.WillMessage != null) { - stream.WriteWithLengthPrefix(packet.WillMessage.Topic); - stream.WriteWithLengthPrefix(packet.WillMessage.Payload); + packetWriter.WriteWithLengthPrefix(packet.WillMessage.Topic); + packetWriter.WriteWithLengthPrefix(packet.WillMessage.Payload); } if (packet.Username != null) { - stream.WriteWithLengthPrefix(packet.Username); + packetWriter.WriteWithLengthPrefix(packet.Username); } if (packet.Password != null) { - stream.WriteWithLengthPrefix(packet.Password); + packetWriter.WriteWithLengthPrefix(packet.Password); } return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Connect); } - private byte Serialize(MqttConnAckPacket packet, Stream stream) + private byte Serialize(MqttConnAckPacket packet, MqttPacketWriter packetWriter) { if (ProtocolVersion == MqttProtocolVersion.V310) { - stream.WriteByte(0); + packetWriter.Write(0); } else if (ProtocolVersion == MqttProtocolVersion.V311) { - var connectAcknowledgeFlags = new ByteWriter(); - connectAcknowledgeFlags.Write(packet.IsSessionPresent); - - stream.Write(connectAcknowledgeFlags); + byte connectAcknowledgeFlags = 0x0; + if (packet.IsSessionPresent) + { + connectAcknowledgeFlags |= 0x1; + } + + packetWriter.Write(connectAcknowledgeFlags); } else { throw new MqttProtocolViolationException("Protocol version not supported."); } - stream.WriteByte((byte)packet.ConnectReturnCode); + packetWriter.Write((byte)packet.ConnectReturnCode); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.ConnAck); } - private static byte Serialize(MqttPubRelPacket packet, Stream stream) + private static byte Serialize(MqttPubRelPacket packet, MqttPacketWriter packetWriter) { if (!packet.PacketIdentifier.HasValue) { throw new MqttProtocolViolationException("PubRel packet has no packet identifier."); } - stream.Write(packet.PacketIdentifier.Value); + packetWriter.Write(packet.PacketIdentifier.Value); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02); } - private static byte Serialize(MqttPublishPacket packet, Stream stream) + private static byte Serialize(MqttPublishPacket packet, MqttPacketWriter packetWriter) { ValidatePublishPacket(packet); - stream.WriteWithLengthPrefix(packet.Topic); + packetWriter.WriteWithLengthPrefix(packet.Topic); if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) { @@ -465,7 +462,7 @@ namespace MQTTnet.Serializer throw new MqttProtocolViolationException("Publish packet has no packet identifier."); } - stream.Write(packet.PacketIdentifier.Value); + packetWriter.Write(packet.PacketIdentifier.Value); } else { @@ -477,7 +474,7 @@ namespace MQTTnet.Serializer if (packet.Payload?.Length > 0) { - stream.Write(packet.Payload, 0, packet.Payload.Length); + packetWriter.Write(packet.Payload, 0, packet.Payload.Length); } byte fixedHeader = 0; @@ -497,43 +494,43 @@ namespace MQTTnet.Serializer return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader); } - private static byte Serialize(MqttPubAckPacket packet, Stream stream) + private static byte Serialize(MqttPubAckPacket packet, MqttPacketWriter packetWriter) { if (!packet.PacketIdentifier.HasValue) { throw new MqttProtocolViolationException("PubAck packet has no packet identifier."); } - stream.Write(packet.PacketIdentifier.Value); + packetWriter.Write(packet.PacketIdentifier.Value); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck); } - private static byte Serialize(MqttPubRecPacket packet, Stream stream) + private static byte Serialize(MqttPubRecPacket packet, MqttPacketWriter packetWriter) { if (!packet.PacketIdentifier.HasValue) { throw new MqttProtocolViolationException("PubRec packet has no packet identifier."); } - stream.Write(packet.PacketIdentifier.Value); + packetWriter.Write(packet.PacketIdentifier.Value); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec); } - private static byte Serialize(MqttPubCompPacket packet, Stream stream) + private static byte Serialize(MqttPubCompPacket packet, MqttPacketWriter packetWriter) { if (!packet.PacketIdentifier.HasValue) { throw new MqttProtocolViolationException("PubComp packet has no packet identifier."); } - stream.Write(packet.PacketIdentifier.Value); + packetWriter.Write(packet.PacketIdentifier.Value); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp); } - private static byte Serialize(MqttSubscribePacket packet, Stream stream) + private static byte Serialize(MqttSubscribePacket packet, MqttPacketWriter packetWriter) { if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3]."); @@ -542,41 +539,41 @@ namespace MQTTnet.Serializer throw new MqttProtocolViolationException("Subscribe packet has no packet identifier."); } - stream.Write(packet.PacketIdentifier.Value); + packetWriter.Write(packet.PacketIdentifier.Value); if (packet.TopicFilters?.Count > 0) { foreach (var topicFilter in packet.TopicFilters) { - stream.WriteWithLengthPrefix(topicFilter.Topic); - stream.WriteByte((byte)topicFilter.QualityOfServiceLevel); + packetWriter.WriteWithLengthPrefix(topicFilter.Topic); + packetWriter.Write((byte)topicFilter.QualityOfServiceLevel); } } return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Subscribe, 0x02); } - private static byte Serialize(MqttSubAckPacket packet, Stream stream) + private static byte Serialize(MqttSubAckPacket packet, MqttPacketWriter packetWriter) { if (!packet.PacketIdentifier.HasValue) { throw new MqttProtocolViolationException("SubAck packet has no packet identifier."); } - stream.Write(packet.PacketIdentifier.Value); + packetWriter.Write(packet.PacketIdentifier.Value); if (packet.SubscribeReturnCodes?.Any() == true) { foreach (var packetSubscribeReturnCode in packet.SubscribeReturnCodes) { - stream.WriteByte((byte)packetSubscribeReturnCode); + packetWriter.Write((byte)packetSubscribeReturnCode); } } return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.SubAck); } - private static byte Serialize(MqttUnsubscribePacket packet, Stream stream) + private static byte Serialize(MqttUnsubscribePacket packet, MqttPacketWriter packetWriter) { if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2]."); @@ -585,27 +582,27 @@ namespace MQTTnet.Serializer throw new MqttProtocolViolationException("Unsubscribe packet has no packet identifier."); } - stream.Write(packet.PacketIdentifier.Value); + packetWriter.Write(packet.PacketIdentifier.Value); if (packet.TopicFilters?.Any() == true) { foreach (var topicFilter in packet.TopicFilters) { - stream.WriteWithLengthPrefix(topicFilter); + packetWriter.WriteWithLengthPrefix(topicFilter); } } return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Unsubscibe, 0x02); } - private static byte Serialize(MqttUnsubAckPacket packet, Stream stream) + private static byte Serialize(MqttUnsubAckPacket packet, MqttPacketWriter packetWriter) { if (!packet.PacketIdentifier.HasValue) { throw new MqttProtocolViolationException("UnsubAck packet has no packet identifier."); } - stream.Write(packet.PacketIdentifier.Value); + packetWriter.Write(packet.PacketIdentifier.Value); return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck); } @@ -614,6 +611,7 @@ namespace MQTTnet.Serializer return MqttPacketWriter.BuildFixedHeader(type); } + // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local private static void ThrowIfBodyIsEmpty(MqttPacketBodyReader body) { if (body == null || body.Length == 0) diff --git a/Source/MQTTnet/Serializer/MqttPacketWriter.cs b/Source/MQTTnet/Serializer/MqttPacketWriter.cs index 2cc9d7b..c0c49fc 100644 --- a/Source/MQTTnet/Serializer/MqttPacketWriter.cs +++ b/Source/MQTTnet/Serializer/MqttPacketWriter.cs @@ -1,12 +1,23 @@ using System; -using System.IO; using System.Text; using MQTTnet.Protocol; namespace MQTTnet.Serializer { - public static class MqttPacketWriter + /// + /// This is a custom implementation of a memory stream which provides only MQTTnet relevant features. + /// The goal is to avoid lots of argument checks like in the original stream. The growth rule is the + /// same as for the original MemoryStream in .net. Also this implementation allows accessing the internal + /// buffer for all platforms and .net framework versions (which is not available at the regular MemoryStream). + /// + public class MqttPacketWriter { + private byte[] _buffer = new byte[128]; + + private int _position; + + public int Length { get; private set; } + public static byte BuildFixedHeader(MqttControlPacketType packetType, byte flags = 0) { var fixedHeader = (int)packetType << 4; @@ -14,33 +25,6 @@ namespace MQTTnet.Serializer return (byte)fixedHeader; } - public static void Write(this Stream stream, ushort value) - { - var buffer = BitConverter.GetBytes(value); - stream.WriteByte(buffer[1]); - stream.WriteByte(buffer[0]); - } - - public static void Write(this Stream stream, ByteWriter value) - { - if (value == null) throw new ArgumentNullException(nameof(value)); - - stream.WriteByte(value.Value); - } - - public static void WriteWithLengthPrefix(this Stream stream, string value) - { - stream.WriteWithLengthPrefix(Encoding.UTF8.GetBytes(value ?? string.Empty)); - } - - public static void WriteWithLengthPrefix(this Stream stream, byte[] value) - { - var length = (ushort)value.Length; - - stream.Write(length); - stream.Write(value, 0, length); - } - public static ArraySegment EncodeRemainingLength(int length) { // write the encoded remaining length right aligned on the 4 byte buffer @@ -69,5 +53,91 @@ namespace MQTTnet.Serializer return new ArraySegment(buffer, 0, bufferOffset); } + + public void WriteWithLengthPrefix(string value) + { + WriteWithLengthPrefix(Encoding.UTF8.GetBytes(value ?? string.Empty)); + } + + public void WriteWithLengthPrefix(byte[] value) + { + EnsureAdditionalCapacity(value.Length + 2); + + Write((ushort)value.Length); + Write(value, 0, value.Length); + } + + public void Write(byte @byte) + { + EnsureAdditionalCapacity(1); + + _buffer[_position] = @byte; + IncreasePostition(1); + } + + public void Write(ushort value) + { + EnsureAdditionalCapacity(2); + + _buffer[_position] = (byte)(value >> 8); + IncreasePostition(1); + _buffer[_position] = (byte)value; + IncreasePostition(1); + } + + public void Write(byte[] array, int offset, int count) + { + EnsureAdditionalCapacity(count); + + Array.Copy(array, offset, _buffer, _position, count); + IncreasePostition(count); + } + + public void Seek(int offset) + { + EnsureCapacity(offset); + _position = offset; + } + + public byte[] GetBuffer() + { + return _buffer; + } + + private void EnsureAdditionalCapacity(int additionalCapacity) + { + var freeSpace = _buffer.Length - _position; + if (freeSpace >= additionalCapacity) + { + return; + } + + EnsureCapacity(additionalCapacity - freeSpace); + } + + private void EnsureCapacity(int capacity) + { + if (_buffer.Length >= capacity) + { + return; + } + + var newBufferLength = _buffer.Length; + while (newBufferLength < capacity) + { + newBufferLength *= 2; + } + + Array.Resize(ref _buffer, newBufferLength); + } + + private void IncreasePostition(int length) + { + _position += length; + if (_position > Length) + { + Length = _position; + } + } } } diff --git a/Source/MQTTnet/Server/MqttClientPendingMessagesQueue.cs b/Source/MQTTnet/Server/MqttClientPendingPacketsQueue.cs similarity index 93% rename from Source/MQTTnet/Server/MqttClientPendingMessagesQueue.cs rename to Source/MQTTnet/Server/MqttClientPendingPacketsQueue.cs index ff43e6f..7a55c0b 100644 --- a/Source/MQTTnet/Server/MqttClientPendingMessagesQueue.cs +++ b/Source/MQTTnet/Server/MqttClientPendingPacketsQueue.cs @@ -11,7 +11,7 @@ using MQTTnet.Protocol; namespace MQTTnet.Server { - public class MqttClientPendingMessagesQueue : IDisposable + public class MqttClientPendingPacketsQueue : IDisposable { private readonly AsyncAutoResetEvent _queueAutoResetEvent = new AsyncAutoResetEvent(); private readonly IMqttServerOptions _options; @@ -20,13 +20,13 @@ namespace MQTTnet.Server private ConcurrentQueue _queue = new ConcurrentQueue(); - public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession clientSession, IMqttNetChildLogger logger) + public MqttClientPendingPacketsQueue(IMqttServerOptions options, MqttClientSession clientSession, IMqttNetChildLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); _options = options ?? throw new ArgumentNullException(nameof(options)); _clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession)); - _logger = logger.CreateChildLogger(nameof(MqttClientPendingMessagesQueue)); + _logger = logger.CreateChildLogger(nameof(MqttClientPendingPacketsQueue)); } public int Count => _queue.Count; @@ -115,7 +115,7 @@ namespace MQTTnet.Server return; } - await adapter.SendPacketAsync(_options.DefaultCommunicationTimeout, packet, cancellationToken).ConfigureAwait(false); + await adapter.SendPacketAsync(packet, cancellationToken).ConfigureAwait(false); _logger.Verbose("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId); } diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 28e97f0..460bea4 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -18,7 +18,7 @@ namespace MQTTnet.Server private readonly MqttRetainedMessagesManager _retainedMessagesManager; private readonly MqttClientKeepAliveMonitor _keepAliveMonitor; - private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; + private readonly MqttClientPendingPacketsQueue _pendingPacketsQueue; private readonly MqttClientSubscriptionsManager _subscriptionsManager; private readonly MqttClientSessionsManager _sessionsManager; @@ -49,7 +49,7 @@ namespace MQTTnet.Server _keepAliveMonitor = new MqttClientKeepAliveMonitor(clientId, () => Stop(MqttClientDisconnectType.NotClean), _logger); _subscriptionsManager = new MqttClientSubscriptionsManager(clientId, _options, sessionsManager.Server); - _pendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger); + _pendingPacketsQueue = new MqttClientPendingPacketsQueue(_options, this, _logger); } public string ClientId { get; } @@ -60,7 +60,7 @@ namespace MQTTnet.Server status.IsConnected = _adapter != null; status.Endpoint = _adapter?.Endpoint; status.ProtocolVersion = _adapter?.PacketSerializer?.ProtocolVersion; - status.PendingApplicationMessagesCount = _pendingMessagesQueue.Count; + status.PendingApplicationMessagesCount = _pendingPacketsQueue.Count; status.LastPacketReceived = _keepAliveMonitor.LastPacketReceived; status.LastNonKeepAlivePacketReceived = _keepAliveMonitor.LastNonKeepAlivePacketReceived; } @@ -80,7 +80,7 @@ namespace MQTTnet.Server _wasCleanDisconnect = false; _willMessage = connectPacket.WillMessage; - _pendingMessagesQueue.Start(adapter, _cancellationTokenSource.Token); + _pendingPacketsQueue.Start(adapter, _cancellationTokenSource.Token); _keepAliveMonitor.Start(connectPacket.KeepAlivePeriod, _cancellationTokenSource.Token); while (!_cancellationTokenSource.IsCancellationRequested) @@ -149,13 +149,10 @@ namespace MQTTnet.Server if (_willMessage != null && !_wasCleanDisconnect) { - _sessionsManager.StartDispatchApplicationMessage(this, _willMessage); + _sessionsManager.EnqueueApplicationMessage(this, _willMessage); } _willMessage = null; - - ////_pendingMessagesQueue.WaitForCompletion(); - ////_keepAliveMonitor.WaitForCompletion(); } finally { @@ -196,7 +193,7 @@ namespace MQTTnet.Server } } - _pendingMessagesQueue.Enqueue(publishPacket); + _pendingPacketsQueue.Enqueue(publishPacket); } public Task SubscribeAsync(IList topicFilters) @@ -226,12 +223,12 @@ namespace MQTTnet.Server public void ClearPendingApplicationMessages() { - _pendingMessagesQueue.Clear(); + _pendingPacketsQueue.Clear(); } public void Dispose() { - _pendingMessagesQueue?.Dispose(); + _pendingPacketsQueue?.Dispose(); _cancellationTokenSource?.Dispose(); } @@ -245,7 +242,7 @@ namespace MQTTnet.Server if (packet is MqttPingReqPacket) { - return adapter.SendPacketAsync(_options.DefaultCommunicationTimeout, new MqttPingRespPacket(), cancellationToken); + return adapter.SendPacketAsync(new MqttPingRespPacket(), cancellationToken); } if (packet is MqttPubRelPacket pubRelPacket) @@ -260,7 +257,7 @@ namespace MQTTnet.Server PacketIdentifier = pubRecPacket.PacketIdentifier }; - return adapter.SendPacketAsync(_options.DefaultCommunicationTimeout, responsePacket, cancellationToken); + return adapter.SendPacketAsync(responsePacket, cancellationToken); } if (packet is MqttPubAckPacket || packet is MqttPubCompPacket) @@ -308,7 +305,7 @@ namespace MQTTnet.Server private async Task HandleIncomingSubscribePacketAsync(IMqttChannelAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken) { var subscribeResult = _subscriptionsManager.Subscribe(subscribePacket); - await adapter.SendPacketAsync(_options.DefaultCommunicationTimeout, subscribeResult.ResponsePacket, cancellationToken).ConfigureAwait(false); + await adapter.SendPacketAsync(subscribeResult.ResponsePacket, cancellationToken).ConfigureAwait(false); if (subscribeResult.CloseConnection) { @@ -322,7 +319,7 @@ namespace MQTTnet.Server private Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken) { var unsubscribeResult = _subscriptionsManager.Unsubscribe(unsubscribePacket); - return adapter.SendPacketAsync(_options.DefaultCommunicationTimeout, unsubscribeResult, cancellationToken); + return adapter.SendPacketAsync(unsubscribeResult, cancellationToken); } private Task HandleIncomingPublishPacketAsync(IMqttChannelAdapter adapter, MqttPublishPacket publishPacket, CancellationToken cancellationToken) @@ -333,7 +330,7 @@ namespace MQTTnet.Server { case MqttQualityOfServiceLevel.AtMostOnce: { - _sessionsManager.StartDispatchApplicationMessage(this, applicationMessage); + _sessionsManager.EnqueueApplicationMessage(this, applicationMessage); return Task.FromResult(0); } case MqttQualityOfServiceLevel.AtLeastOnce: @@ -353,25 +350,25 @@ namespace MQTTnet.Server private Task HandleIncomingPublishPacketWithQoS1(IMqttChannelAdapter adapter, MqttApplicationMessage applicationMessage, MqttPublishPacket publishPacket, CancellationToken cancellationToken) { - _sessionsManager.StartDispatchApplicationMessage(this, applicationMessage); + _sessionsManager.EnqueueApplicationMessage(this, applicationMessage); var response = new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }; - return adapter.SendPacketAsync(_options.DefaultCommunicationTimeout, response, cancellationToken); + return adapter.SendPacketAsync(response, cancellationToken); } private Task HandleIncomingPublishPacketWithQoS2(IMqttChannelAdapter adapter, MqttApplicationMessage applicationMessage, MqttPublishPacket publishPacket, CancellationToken cancellationToken) { // QoS 2 is implement as method "B" (4.3.3 QoS 2: Exactly once delivery) - _sessionsManager.StartDispatchApplicationMessage(this, applicationMessage); + _sessionsManager.EnqueueApplicationMessage(this, applicationMessage); var response = new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }; - return adapter.SendPacketAsync(_options.DefaultCommunicationTimeout, response, cancellationToken); + return adapter.SendPacketAsync(response, cancellationToken); } - private Task HandleIncomingPubRelPacketAsync(IMqttChannelAdapter adapter, MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken) + private static Task HandleIncomingPubRelPacketAsync(IMqttChannelAdapter adapter, MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken) { var response = new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }; - return adapter.SendPacketAsync(_options.DefaultCommunicationTimeout, response, cancellationToken); + return adapter.SendPacketAsync(response, cancellationToken); } private void OnAdapterReadingPacketCompleted(object sender, EventArgs e) diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index c52c706..1cb301a 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -6,27 +6,29 @@ using System.Threading.Tasks; using MQTTnet.Adapter; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; -using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; namespace MQTTnet.Server { - public class MqttClientSessionsManager : IDisposable + public class MqttClientSessionsManager { + private readonly BlockingCollection _messageQueue = new BlockingCollection(); private readonly ConcurrentDictionary _sessions = new ConcurrentDictionary(); - private readonly AsyncLock _sessionPreparationLock = new AsyncLock(); + + private readonly CancellationToken _cancellationToken; private readonly MqttRetainedMessagesManager _retainedMessagesManager; private readonly IMqttServerOptions _options; private readonly IMqttNetChildLogger _logger; - public MqttClientSessionsManager(IMqttServerOptions options, MqttServer server, MqttRetainedMessagesManager retainedMessagesManager, IMqttNetChildLogger logger) + public MqttClientSessionsManager(IMqttServerOptions options, MqttServer server, MqttRetainedMessagesManager retainedMessagesManager, CancellationToken cancellationToken, IMqttNetChildLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(MqttClientSessionsManager)); + _cancellationToken = cancellationToken; _options = options ?? throw new ArgumentNullException(nameof(options)); Server = server ?? throw new ArgumentNullException(nameof(server)); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); @@ -34,7 +36,129 @@ namespace MQTTnet.Server public MqttServer Server { get; } - public async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) + public void Start() + { + Task.Factory.StartNew(() => ProcessQueuedApplicationMessages(_cancellationToken), _cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default); + } + + public Task StopAsync() + { + foreach (var session in _sessions) + { + session.Value.Stop(MqttClientDisconnectType.NotClean); + } + + _sessions.Clear(); + return Task.FromResult(0); + } + + public Task StartSession(IMqttChannelAdapter clientAdapter) + { + return Task.Run(() => RunSession(clientAdapter, _cancellationToken), _cancellationToken); + } + + public Task> GetClientStatusAsync() + { + var result = new List(); + foreach (var session in _sessions) + { + var status = new MqttClientSessionStatus(this, session.Value); + session.Value.FillStatus(status); + + result.Add(status); + } + + return Task.FromResult((IList)result); + } + + public void EnqueueApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage) + { + if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); + + _messageQueue.Add(new MqttEnqueuedApplicationMessage(senderClientSession, applicationMessage), _cancellationToken); + } + + public Task SubscribeAsync(string clientId, IList topicFilters) + { + if (clientId == null) throw new ArgumentNullException(nameof(clientId)); + if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); + + if (!_sessions.TryGetValue(clientId, out var session)) + { + throw new InvalidOperationException($"Client session '{clientId}' is unknown."); + } + + return session.SubscribeAsync(topicFilters); + } + + public Task UnsubscribeAsync(string clientId, IList topicFilters) + { + if (clientId == null) throw new ArgumentNullException(nameof(clientId)); + if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); + + if (!_sessions.TryGetValue(clientId, out var session)) + { + throw new InvalidOperationException($"Client session '{clientId}' is unknown."); + } + + return session.UnsubscribeAsync(topicFilters); + } + + public void DeleteSession(string clientId) + { + _sessions.TryRemove(clientId, out _); + _logger.Verbose("Session for client '{0}' deleted.", clientId); + } + + private void ProcessQueuedApplicationMessages(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + var enqueuedApplicationMessage = _messageQueue.Take(cancellationToken); + var sender = enqueuedApplicationMessage.Sender; + var applicationMessage = enqueuedApplicationMessage.ApplicationMessage; + + var interceptorContext = InterceptApplicationMessage(sender, applicationMessage); + if (interceptorContext != null) + { + if (interceptorContext.CloseConnection) + { + enqueuedApplicationMessage.Sender.Stop(MqttClientDisconnectType.NotClean); + } + + if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish) + { + return; + } + + applicationMessage = interceptorContext.ApplicationMessage; + } + + Server.OnApplicationMessageReceived(sender?.ClientId, applicationMessage); + + if (applicationMessage.Retain) + { + _retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).GetAwaiter().GetResult(); + } + + foreach (var clientSession in _sessions.Values) + { + clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, applicationMessage); + } + } + catch (TaskCanceledException) + { + } + catch (Exception exception) + { + _logger.Error(exception, "Unhandled exception while processing queued application message."); + } + } + } + + private async Task RunSession(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) { var clientId = string.Empty; var wasCleanDisconnect = false; @@ -60,7 +184,7 @@ namespace MQTTnet.Server var connectReturnCode = ValidateConnection(connectPacket); if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { - await clientAdapter.SendPacketAsync(_options.DefaultCommunicationTimeout, + await clientAdapter.SendPacketAsync( new MqttConnAckPacket { ConnectReturnCode = connectReturnCode @@ -70,15 +194,15 @@ namespace MQTTnet.Server return; } - var result = await PrepareClientSessionAsync(connectPacket).ConfigureAwait(false); + var result = PrepareClientSession(connectPacket); var clientSession = result.Session; - await clientAdapter.SendPacketAsync(_options.DefaultCommunicationTimeout, + await clientAdapter.SendPacketAsync( new MqttConnAckPacket { ConnectReturnCode = connectReturnCode, IsSessionPresent = result.IsExistingSession - }, + }, cancellationToken).ConfigureAwait(false); Server.OnClientConnected(clientId); @@ -113,73 +237,6 @@ namespace MQTTnet.Server } } - public Task StopAsync() - { - foreach (var session in _sessions) - { - session.Value.Stop(MqttClientDisconnectType.NotClean); - } - - _sessions.Clear(); - return Task.FromResult(0); - } - - public Task> GetClientStatusAsync() - { - var result = new List(); - foreach (var session in _sessions) - { - var status = new MqttClientSessionStatus(this, session.Value); - session.Value.FillStatus(status); - - result.Add(status); - } - - return Task.FromResult((IList)result); - } - - public void StartDispatchApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage) - { - Task.Run(() => DispatchApplicationMessageAsync(senderClientSession, applicationMessage)); - } - - public Task SubscribeAsync(string clientId, IList topicFilters) - { - if (clientId == null) throw new ArgumentNullException(nameof(clientId)); - if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - - if (!_sessions.TryGetValue(clientId, out var session)) - { - throw new InvalidOperationException($"Client session '{clientId}' is unknown."); - } - - return session.SubscribeAsync(topicFilters); - } - - public Task UnsubscribeAsync(string clientId, IList topicFilters) - { - if (clientId == null) throw new ArgumentNullException(nameof(clientId)); - if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - - if (!_sessions.TryGetValue(clientId, out var session)) - { - throw new InvalidOperationException($"Client session '{clientId}' is unknown."); - } - - return session.UnsubscribeAsync(topicFilters); - } - - public void DeleteSession(string clientId) - { - _sessions.TryRemove(clientId, out _); - _logger.Verbose("Session for client '{0}' deleted.", clientId); - } - - public void Dispose() - { - _sessionPreparationLock?.Dispose(); - } - private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket) { if (_options.ConnectionValidator == null) @@ -197,9 +254,9 @@ namespace MQTTnet.Server return context.ReturnCode; } - private async Task PrepareClientSessionAsync(MqttConnectPacket connectPacket) + private PrepareClientSessionResult PrepareClientSession(MqttConnectPacket connectPacket) { - using (await _sessionPreparationLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) + lock (_sessions) { var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var clientSession); if (isSessionPresent) @@ -231,60 +288,19 @@ namespace MQTTnet.Server _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); } - return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; + return new PrepareClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; } } - private async Task DispatchApplicationMessageAsync(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage) + private MqttApplicationMessageInterceptorContext InterceptApplicationMessage(MqttClientSession sender, MqttApplicationMessage applicationMessage) { - try - { - var interceptorContext = InterceptApplicationMessage(senderClientSession, applicationMessage); - if (interceptorContext != null) - { - if (interceptorContext.CloseConnection) - { - senderClientSession.Stop(MqttClientDisconnectType.NotClean); - } - - if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish) - { - return; - } - - applicationMessage = interceptorContext.ApplicationMessage; - } - - Server.OnApplicationMessageReceived(senderClientSession?.ClientId, applicationMessage); - - if (applicationMessage.Retain) - { - await _retainedMessagesManager.HandleMessageAsync(senderClientSession?.ClientId, applicationMessage).ConfigureAwait(false); - } - - foreach (var clientSession in _sessions.Values) - { - clientSession.EnqueueApplicationMessage(senderClientSession, applicationMessage); - } - } - catch (Exception exception) - { - _logger.Error(exception, "Error while processing application message"); - } - } - - private MqttApplicationMessageInterceptorContext InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage) - { - var interceptorContext = new MqttApplicationMessageInterceptorContext( - senderClientSession?.ClientId, - applicationMessage); - var interceptor = _options.ApplicationMessageInterceptor; if (interceptor == null) { - return interceptorContext; + return null; } + var interceptorContext = new MqttApplicationMessageInterceptorContext(sender?.ClientId, applicationMessage); interceptor(interceptorContext); return interceptorContext; } diff --git a/Source/MQTTnet/Server/MqttEnqueuedApplicationMessage.cs b/Source/MQTTnet/Server/MqttEnqueuedApplicationMessage.cs new file mode 100644 index 0000000..20ff2fe --- /dev/null +++ b/Source/MQTTnet/Server/MqttEnqueuedApplicationMessage.cs @@ -0,0 +1,15 @@ +namespace MQTTnet.Server +{ + public class MqttEnqueuedApplicationMessage + { + public MqttEnqueuedApplicationMessage(MqttClientSession sender, MqttApplicationMessage applicationMessage) + { + Sender = sender; + ApplicationMessage = applicationMessage; + } + + public MqttClientSession Sender { get; } + + public MqttApplicationMessage ApplicationMessage { get; } + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index a38dfd2..38e631e 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -65,7 +65,7 @@ namespace MQTTnet.Server if (_cancellationTokenSource == null) throw new InvalidOperationException("The server is not started."); - _clientSessionsManager.StartDispatchApplicationMessage(null, applicationMessage); + _clientSessionsManager.EnqueueApplicationMessage(null, applicationMessage); return Task.FromResult(0); } @@ -81,7 +81,8 @@ namespace MQTTnet.Server _retainedMessagesManager = new MqttRetainedMessagesManager(Options, _logger); await _retainedMessagesManager.LoadMessagesAsync().ConfigureAwait(false); - _clientSessionsManager = new MqttClientSessionsManager(Options, this, _retainedMessagesManager, _logger); + _clientSessionsManager = new MqttClientSessionsManager(Options, this, _retainedMessagesManager, _cancellationTokenSource.Token, _logger); + _clientSessionsManager.Start(); foreach (var adapter in _adapters) { @@ -118,8 +119,6 @@ namespace MQTTnet.Server } finally { - _clientSessionsManager?.Dispose(); - _cancellationTokenSource = null; _retainedMessagesManager = null; _clientSessionsManager = null; @@ -155,9 +154,7 @@ namespace MQTTnet.Server private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) { - eventArgs.SessionTask = Task.Run( - () => _clientSessionsManager.RunSessionAsync(eventArgs.Client, _cancellationTokenSource.Token), - _cancellationTokenSource.Token); + eventArgs.SessionTask = _clientSessionsManager.StartSession(eventArgs.Client); } } } diff --git a/Source/MQTTnet/Server/GetOrCreateClientSessionResult.cs b/Source/MQTTnet/Server/PrepareClientSessionResult.cs similarity index 76% rename from Source/MQTTnet/Server/GetOrCreateClientSessionResult.cs rename to Source/MQTTnet/Server/PrepareClientSessionResult.cs index 975d237..9a655be 100644 --- a/Source/MQTTnet/Server/GetOrCreateClientSessionResult.cs +++ b/Source/MQTTnet/Server/PrepareClientSessionResult.cs @@ -1,6 +1,6 @@ namespace MQTTnet.Server { - public class GetOrCreateClientSessionResult + public class PrepareClientSessionResult { public bool IsExistingSession { get; set; } diff --git a/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs b/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs index 20b581c..7f068d6 100644 --- a/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs @@ -65,7 +65,7 @@ namespace MQTTnet.Benchmarks for (var i = 0; i < 10000; i++) { - _channelAdapter.SendPacketAsync(TimeSpan.FromSeconds(15), _packet, CancellationToken.None).GetAwaiter().GetResult(); + _channelAdapter.SendPacketAsync(_packet, CancellationToken.None).GetAwaiter().GetResult(); } _stream.Position = 0; diff --git a/Tests/MQTTnet.Core.Tests/ByteReaderTests.cs b/Tests/MQTTnet.Core.Tests/ByteReaderTests.cs deleted file mode 100644 index e2173cb..0000000 --- a/Tests/MQTTnet.Core.Tests/ByteReaderTests.cs +++ /dev/null @@ -1,30 +0,0 @@ -using Microsoft.VisualStudio.TestTools.UnitTesting; -using MQTTnet.Serializer; - -namespace MQTTnet.Core.Tests -{ - [TestClass] - public class ByteReaderTests - { - [TestMethod] - public void ByteReader_ReadToEnd() - { - var reader = new ByteReader(85); - Assert.IsTrue(reader.Read()); - Assert.IsFalse(reader.Read()); - Assert.IsTrue(reader.Read()); - Assert.IsFalse(reader.Read()); - Assert.IsTrue(reader.Read()); - Assert.IsFalse(reader.Read()); - Assert.IsTrue(reader.Read()); - Assert.IsFalse(reader.Read()); - } - - [TestMethod] - public void ByteReader_ReadPartial() - { - var reader = new ByteReader(15); - Assert.AreEqual(3, reader.Read(2)); - } - } -} diff --git a/Tests/MQTTnet.Core.Tests/ByteWriterTests.cs b/Tests/MQTTnet.Core.Tests/ByteWriterTests.cs deleted file mode 100644 index 881df5c..0000000 --- a/Tests/MQTTnet.Core.Tests/ByteWriterTests.cs +++ /dev/null @@ -1,51 +0,0 @@ -using Microsoft.VisualStudio.TestTools.UnitTesting; -using MQTTnet.Serializer; - -namespace MQTTnet.Core.Tests -{ - [TestClass] - public class ByteWriterTests - { - [TestMethod] - public void ByteWriter_WriteMultipleAll() - { - var b = new ByteWriter(); - Assert.AreEqual(0, b.Value); - b.Write(3, 2); - Assert.AreEqual(3, b.Value); - } - - [TestMethod] - public void ByteWriter_WriteMultiplePartial() - { - var b = new ByteWriter(); - Assert.AreEqual(0, b.Value); - b.Write(255, 2); - Assert.AreEqual(3, b.Value); - } - - [TestMethod] - public void ByteWriter_WriteTo0xFF() - { - var b = new ByteWriter(); - - Assert.AreEqual(0, b.Value); - b.Write(true); - Assert.AreEqual(1, b.Value); - b.Write(true); - Assert.AreEqual(3, b.Value); - b.Write(true); - Assert.AreEqual(7, b.Value); - b.Write(true); - Assert.AreEqual(15, b.Value); - b.Write(true); - Assert.AreEqual(31, b.Value); - b.Write(true); - Assert.AreEqual(63, b.Value); - b.Write(true); - Assert.AreEqual(127, b.Value); - b.Write(true); - Assert.AreEqual(255, b.Value); - } - } -} diff --git a/Tests/MQTTnet.Core.Tests/ExtensionTests.cs b/Tests/MQTTnet.Core.Tests/ExtensionTests.cs index 696cfa3..fdb5888 100644 --- a/Tests/MQTTnet.Core.Tests/ExtensionTests.cs +++ b/Tests/MQTTnet.Core.Tests/ExtensionTests.cs @@ -14,20 +14,20 @@ namespace MQTTnet.Core.Tests [TestMethod] public async Task TimeoutAfter() { - await MQTTnet.Internal.TaskExtensions.TimeoutAfter(ct => Task.Delay(TimeSpan.FromMilliseconds(500), ct), TimeSpan.FromMilliseconds(100), CancellationToken.None); + await MQTTnet.Internal.TaskExtensions.TimeoutAfterAsync(ct => Task.Delay(TimeSpan.FromMilliseconds(500), ct), TimeSpan.FromMilliseconds(100), CancellationToken.None); } [ExpectedException(typeof(MqttCommunicationTimedOutException))] [TestMethod] public async Task TimeoutAfterWithResult() { - await MQTTnet.Internal.TaskExtensions.TimeoutAfter(ct => Task.Delay(TimeSpan.FromMilliseconds(500), ct).ContinueWith(t => 5, ct), TimeSpan.FromMilliseconds(100), CancellationToken.None); + await MQTTnet.Internal.TaskExtensions.TimeoutAfterAsync(ct => Task.Delay(TimeSpan.FromMilliseconds(500), ct).ContinueWith(t => 5, ct), TimeSpan.FromMilliseconds(100), CancellationToken.None); } [TestMethod] public async Task TimeoutAfterCompleteInTime() { - var result = await MQTTnet.Internal.TaskExtensions.TimeoutAfter(ct => Task.Delay(TimeSpan.FromMilliseconds(100), ct).ContinueWith(t => 5, ct), TimeSpan.FromMilliseconds(500), CancellationToken.None); + var result = await MQTTnet.Internal.TaskExtensions.TimeoutAfterAsync(ct => Task.Delay(TimeSpan.FromMilliseconds(100), ct).ContinueWith(t => 5, ct), TimeSpan.FromMilliseconds(500), CancellationToken.None); Assert.AreEqual(5, result); } @@ -36,7 +36,7 @@ namespace MQTTnet.Core.Tests { try { - await MQTTnet.Internal.TaskExtensions.TimeoutAfter(ct => Task.Run(() => + await MQTTnet.Internal.TaskExtensions.TimeoutAfterAsync(ct => Task.Run(() => { var iis = new int[0]; iis[1] = 0; @@ -55,7 +55,7 @@ namespace MQTTnet.Core.Tests { try { - await MQTTnet.Internal.TaskExtensions.TimeoutAfter(ct => Task.Run(() => + await MQTTnet.Internal.TaskExtensions.TimeoutAfterAsync(ct => Task.Run(() => { var iis = new int[0]; iis[1] = 0; @@ -76,7 +76,7 @@ namespace MQTTnet.Core.Tests var tasks = Enumerable.Range(0, 100000) .Select(i => { - return MQTTnet.Internal.TaskExtensions.TimeoutAfter(ct => Task.Delay(TimeSpan.FromMilliseconds(1), ct), TimeSpan.FromMinutes(1), CancellationToken.None); + return MQTTnet.Internal.TaskExtensions.TimeoutAfterAsync(ct => Task.Delay(TimeSpan.FromMilliseconds(1), ct), TimeSpan.FromMinutes(1), CancellationToken.None); }); await Task.WhenAll(tasks); diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketReaderTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketReaderTests.cs index e3e1ccd..f983622 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketReaderTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketReaderTests.cs @@ -11,7 +11,7 @@ namespace MQTTnet.Core.Tests public class MqttPacketReaderTests { [TestMethod] - [ExpectedException(typeof(MqttCommunicationException))] + [ExpectedException(typeof(MqttCommunicationClosedGracefullyException))] public void MqttPacketReader_EmptyStream() { MqttPacketReader.ReadFixedHeaderAsync(new TestMqttChannel(new MemoryStream()), CancellationToken.None).GetAwaiter().GetResult(); diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs index ffe2a77..9375f07 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MQTTnet.Adapter; @@ -36,7 +35,7 @@ namespace MQTTnet.Core.Tests return Task.FromResult(0); } - public Task SendPacketAsync(TimeSpan timeout, MqttBasePacket packet, CancellationToken cancellationToken) + public Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) { ThrowIfPartnerIsNull(); diff --git a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs index e9e4bc4..752feb5 100644 --- a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs @@ -12,16 +12,46 @@ namespace MQTTnet.TestApp.NetCore { public static class PerformanceTest { - public static async Task RunAsync() + public static void Run() { - Console.WriteLine("Press 'c' for concurrent sends. Otherwise in one batch."); - var concurrent = Console.ReadKey(true).KeyChar == 'c'; + try + { + var mqttServer = new MqttFactory().CreateMqttServer(); + mqttServer.StartAsync(new MqttServerOptions()).GetAwaiter().GetResult(); - var server = Task.Run(RunServerAsync); - await Task.Delay(1000); - var client = Task.Run(() => RunClientAsync(2000, TimeSpan.FromMilliseconds(10), concurrent)); + var options = new MqttClientOptions + { + ChannelOptions = new MqttClientTcpOptions + { + Server = "127.0.0.1" + }, + CleanSession = true + }; + + var client = new MqttFactory().CreateMqttClient(); + client.ConnectAsync(options).GetAwaiter().GetResult(); + + var message = CreateMessage(); + var stopwatch = new Stopwatch(); - await Task.WhenAll(server, client).ConfigureAwait(false); + for (var i = 0; i < 10; i++) + { + stopwatch.Restart(); + + var sentMessagesCount = 0; + while (stopwatch.ElapsedMilliseconds < 1000) + { + client.PublishAsync(message).GetAwaiter().GetResult(); + sentMessagesCount++; + } + + Console.WriteLine($"Sending {sentMessagesCount} messages per second. #" + (i + 1)); + } + } + catch (Exception exception) + { + Console.WriteLine(exception); + } } private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval, bool concurrent) @@ -53,29 +83,8 @@ namespace MQTTnet.TestApp.NetCore } var message = CreateMessage(); - var messages = new[] { message }; - var stopwatch = Stopwatch.StartNew(); - var sentMessagesCount = 0; - while (stopwatch.ElapsedMilliseconds < 1000) - { - client.PublishAsync(messages).GetAwaiter().GetResult(); - sentMessagesCount++; - } - - Console.WriteLine($"Sending {sentMessagesCount} messages per second. #1"); - - sentMessagesCount = 0; - stopwatch.Restart(); - while (stopwatch.ElapsedMilliseconds < 1000) - { - await client.PublishAsync(messages).ConfigureAwait(false); - sentMessagesCount++; - } - - Console.WriteLine($"Sending {sentMessagesCount} messages per second. #2"); - var testMessageCount = 10000; for (var i = 0; i < testMessageCount; i++) { @@ -142,38 +151,5 @@ namespace MQTTnet.TestApp.NetCore Interlocked.Increment(ref count); return Task.Run(() => client.PublishAsync(applicationMessage)); } - - private static async Task RunServerAsync() - { - try - { - var mqttServer = new MqttFactory().CreateMqttServer(); - - ////var msgs = 0; - ////var stopwatch = Stopwatch.StartNew(); - ////mqttServer.ApplicationMessageReceived += (sender, args) => - ////{ - //// msgs++; - //// if (stopwatch.ElapsedMilliseconds > 1000) - //// { - //// Console.WriteLine($"received {msgs}"); - //// msgs = 0; - //// stopwatch.Restart(); - //// } - ////}; - await mqttServer.StartAsync(new MqttServerOptions()); - - Console.WriteLine("Press any key to exit."); - Console.ReadLine(); - - await mqttServer.StopAsync().ConfigureAwait(false); - } - catch (Exception e) - { - Console.WriteLine(e); - } - - Console.ReadLine(); - } } } diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index f8a5d27..25302c7 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -34,7 +34,8 @@ namespace MQTTnet.TestApp.NetCore } else if (pressedKey.KeyChar == '3') { - Task.Run(PerformanceTest.RunAsync); + PerformanceTest.Run(); + return; } else if (pressedKey.KeyChar == '4') {