From 16a184a92f5982796b704f28893ebfb28176d690 Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Mon, 11 Sep 2017 12:51:05 +0200 Subject: [PATCH 01/14] use streams --- .../Implementations/MqttServerAdapter.cs | 9 +- .../Implementations/MqttTcpChannel.cs | 41 +-- .../Implementations/MqttWebSocketChannel.cs | 65 +---- .../Implementations/WebSocketStream.cs | 80 ++++++ .../MQTTnet.NetFramework.csproj | 1 + .../MQTTnet.NetFramework/MqttClientFactory.cs | 3 +- .../Implementations/MqttTcpChannel.cs | 42 +-- .../Implementations/MqttWebSocketChannel.cs | 67 +---- .../Implementations/WebSocketStream.cs | 80 ++++++ .../Channel/BufferedCommunicationChannel.cs | 77 ------ .../Channel/IMqttCommunicationChannel.cs | 13 +- MQTTnet.Core/MQTTnet.Core.csproj | 1 + MQTTnet.Core/Serializer/MqttPacketReader.cs | 28 +- .../Serializer/MqttPacketSerializer.cs | 241 ++++++++++-------- .../MqttPacketSerializerTests.cs | 18 +- 15 files changed, 330 insertions(+), 436 deletions(-) create mode 100644 Frameworks/MQTTnet.NetFramework/Implementations/WebSocketStream.cs create mode 100644 Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs delete mode 100644 MQTTnet.Core/Channel/BufferedCommunicationChannel.cs diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs index 47dfb45..a5279e0 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs @@ -87,7 +87,9 @@ namespace MQTTnet.Implementations try { var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false); - var clientAdapter = new MqttChannelCommunicationAdapter(new BufferedCommunicationChannel(new MqttTcpChannel(clientSocket, null)), new MqttPacketSerializer()); + + var tcpChannel = new MqttTcpChannel(clientSocket, null); + var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) when (!(exception is ObjectDisposedException)) @@ -110,8 +112,9 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); - - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); + + var tcpChannel = new MqttTcpChannel(clientSocket, sslStream); + var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index 0ecf373..dede268 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -17,6 +17,8 @@ namespace MQTTnet.Implementations private Socket _socket; private SslStream _sslStream; + public Stream Stream => _dataStream; + /// /// called on client sockets are created in connect /// @@ -79,45 +81,6 @@ namespace MQTTnet.Implementations } } - public async Task WriteAsync(byte[] buffer) - { - if (buffer == null) throw new ArgumentNullException(nameof(buffer)); - - try - { - await _dataStream.WriteAsync(buffer, 0, buffer.Length); - } - catch (SocketException exception) - { - throw new MqttCommunicationException(exception); - } - } - - public async Task> ReadAsync(int length, byte[] buffer) - { - try - { - var totalBytes = 0; - - do - { - var read = await _dataStream.ReadAsync(buffer, totalBytes, length - totalBytes).ConfigureAwait(false); - if (read == 0) - { - throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting)); - } - - totalBytes += read; - } - while (totalBytes < length); - return new ArraySegment(buffer, 0, length); - } - catch (SocketException exception) - { - throw new MqttCommunicationException(exception); - } - } - public void Dispose() { _socket?.Dispose(); diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs index 925c818..c3aed02 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs @@ -2,6 +2,7 @@ using MQTTnet.Core.Client; using MQTTnet.Core.Exceptions; using System; +using System.IO; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; @@ -11,8 +12,8 @@ namespace MQTTnet.Implementations public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { private ClientWebSocket _webSocket = new ClientWebSocket(); - private int WebSocketBufferSize; - private int WebSocketBufferOffset; + + public Stream Stream { get; private set; } public async Task ConnectAsync(MqttClientOptions options) { @@ -22,6 +23,8 @@ namespace MQTTnet.Implementations { _webSocket = new ClientWebSocket(); await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + + Stream = new WebSocketStream(_webSocket); } catch (WebSocketException exception) { @@ -31,6 +34,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync() { + Stream = null; return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } @@ -38,62 +42,5 @@ namespace MQTTnet.Implementations { _webSocket?.Dispose(); } - - public async Task> ReadAsync(int length, byte[] buffer) - { - await ReadToBufferAsync(length, buffer).ConfigureAwait(false); - - var result = new ArraySegment(buffer, WebSocketBufferOffset, length); - WebSocketBufferSize -= length; - WebSocketBufferOffset += length; - - return result; - } - - private async Task ReadToBufferAsync(int length, byte[] buffer) - { - if (WebSocketBufferSize > 0) - { - return; - } - - var offset = 0; - while (_webSocket.State == WebSocketState.Open && WebSocketBufferSize < length) - { - WebSocketReceiveResult response; - do - { - response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, offset, buffer.Length - offset), CancellationToken.None).ConfigureAwait(false); - offset += response.Count; - } while (!response.EndOfMessage); - - WebSocketBufferSize = response.Count; - if (response.MessageType == WebSocketMessageType.Close) - { - await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false); - } - } - } - - public async Task WriteAsync(byte[] buffer) - { - if (buffer == null) { - throw new ArgumentNullException(nameof(buffer)); - } - - try - { - await _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, CancellationToken.None); - } - catch (WebSocketException exception) - { - throw new MqttCommunicationException(exception); - } - } - - public int Peek() - { - return WebSocketBufferSize; - } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.NetFramework/Implementations/WebSocketStream.cs new file mode 100644 index 0000000..4a89a92 --- /dev/null +++ b/Frameworks/MQTTnet.NetFramework/Implementations/WebSocketStream.cs @@ -0,0 +1,80 @@ +using System; +using System.IO; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Implementations +{ + public class WebSocketStream : Stream + { + private readonly ClientWebSocket _webSocket; + + public WebSocketStream( ClientWebSocket webSocket ) + { + _webSocket = webSocket; + } + + public override void Flush() + { + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + var currentOffset = offset; + var targetOffset = offset + count; + while (_webSocket.State == WebSocketState.Open && currentOffset < targetOffset) + { + var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); + currentOffset += response.Count; + + if ( response.MessageType == WebSocketMessageType.Close ) + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); + } + } + + return currentOffset - offset; + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return _webSocket.SendAsync(new ArraySegment( buffer, offset, count ), WebSocketMessageType.Binary, true, cancellationToken); + } + + public override int Read( byte[] buffer, int offset, int count ) + { + return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => true; + + public override long Length + { + get { throw new NotSupportedException(); } + } + + public override long Position + { + get { throw new NotSupportedException(); } + set { throw new NotSupportedException(); } + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + } +} diff --git a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj index 7578afe..3f2519f 100644 --- a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj +++ b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj @@ -101,6 +101,7 @@ + diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs index b3e0080..b057dde 100644 --- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs @@ -22,7 +22,8 @@ namespace MQTTnet { case MqttConnectionType.Tcp: case MqttConnectionType.Tls: - return new BufferedCommunicationChannel( new MqttTcpChannel() ); + var tcp = new MqttTcpChannel(); + return tcp; case MqttConnectionType.Ws: case MqttConnectionType.Wss: return new MqttWebSocketChannel(); diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index bddd79d..817ab44 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -17,6 +17,9 @@ namespace MQTTnet.Implementations private Socket _socket; private SslStream _sslStream; + + public Stream Stream => _dataStream; + /// /// called on client sockets are created in connect /// @@ -78,45 +81,6 @@ namespace MQTTnet.Implementations } } - public async Task WriteAsync(byte[] buffer) - { - if (buffer == null) throw new ArgumentNullException(nameof(buffer)); - - try - { - await _dataStream.WriteAsync(buffer, 0, buffer.Length); - } - catch (SocketException exception) - { - throw new MqttCommunicationException(exception); - } - } - - public async Task> ReadAsync(int length, byte[] buffer) - { - try - { - var totalBytes = 0; - - do - { - var read = await _dataStream.ReadAsync(buffer, totalBytes, length - totalBytes).ConfigureAwait(false); - if (read == 0) - { - throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting)); - } - - totalBytes += read; - } - while (totalBytes < length); - return new ArraySegment(buffer, 0, length); - } - catch (SocketException exception) - { - throw new MqttCommunicationException(exception); - } - } - public void Dispose() { _socket?.Dispose(); diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index 5a89ac7..2e0bdf8 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -2,6 +2,7 @@ using MQTTnet.Core.Client; using MQTTnet.Core.Exceptions; using System; +using System.IO; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; @@ -11,8 +12,8 @@ namespace MQTTnet.Implementations public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { private ClientWebSocket _webSocket = new ClientWebSocket(); - private int _bufferSize; - private int _bufferOffset; + + public Stream Stream { get; private set; } public async Task ConnectAsync(MqttClientOptions options) { @@ -22,6 +23,8 @@ namespace MQTTnet.Implementations { _webSocket = new ClientWebSocket(); await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + + Stream = new WebSocketStream(_webSocket); } catch (WebSocketException exception) { @@ -31,6 +34,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync() { + Stream = null; return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } @@ -38,64 +42,5 @@ namespace MQTTnet.Implementations { _webSocket?.Dispose(); } - - public async Task> ReadAsync(int length, byte[] buffer) - { - await ReadToBufferAsync(length, buffer).ConfigureAwait(false); - - var result = new ArraySegment(buffer, _bufferOffset, length); - _bufferSize -= length; - _bufferOffset += length; - - return result; - } - - private async Task ReadToBufferAsync(int length, byte[] buffer) - { - if (_bufferSize > 0) - { - return; - } - - var offset = 0; - while (_webSocket.State == WebSocketState.Open && _bufferSize < length) - { - WebSocketReceiveResult response; - do - { - response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, offset, buffer.Length - offset), CancellationToken.None).ConfigureAwait(false); - offset += response.Count; - } while (!response.EndOfMessage); - - _bufferSize = response.Count; - - if (response.MessageType == WebSocketMessageType.Close) - { - await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false); - } - } - } - - public async Task WriteAsync(byte[] buffer) - { - if (buffer == null) - { - throw new ArgumentNullException(nameof(buffer)); - } - - try - { - await _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, CancellationToken.None); - } - catch (WebSocketException exception) - { - throw new MqttCommunicationException(exception); - } - } - - public int Peek() - { - return _bufferSize; - } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs new file mode 100644 index 0000000..0b74692 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs @@ -0,0 +1,80 @@ +using System; +using System.IO; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Implementations +{ + public class WebSocketStream : Stream + { + private readonly ClientWebSocket _webSocket; + + public WebSocketStream(ClientWebSocket webSocket) + { + _webSocket = webSocket; + } + + public override void Flush() + { + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + var currentOffset = offset; + var targetOffset = offset + count; + while ( _webSocket.State == WebSocketState.Open && currentOffset < targetOffset ) + { + var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); + currentOffset += response.Count; + + if ( response.MessageType == WebSocketMessageType.Close ) + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); + } + } + + return currentOffset - offset; + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return _webSocket.SendAsync(new ArraySegment( buffer, offset, count ), WebSocketMessageType.Binary, true, cancellationToken); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync( buffer, offset, count ).GetAwaiter().GetResult(); + } + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => true; + + public override long Length + { + get { throw new NotSupportedException(); } + } + + public override long Position + { + get { throw new NotSupportedException(); } + set { throw new NotSupportedException(); } + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + } +} diff --git a/MQTTnet.Core/Channel/BufferedCommunicationChannel.cs b/MQTTnet.Core/Channel/BufferedCommunicationChannel.cs deleted file mode 100644 index 757f9c2..0000000 --- a/MQTTnet.Core/Channel/BufferedCommunicationChannel.cs +++ /dev/null @@ -1,77 +0,0 @@ -using System.Threading.Tasks; -using MQTTnet.Core.Client; -using System; - -namespace MQTTnet.Core.Channel -{ - public class BufferedCommunicationChannel : IMqttCommunicationChannel - { - private readonly IMqttCommunicationChannel _inner; - private int _bufferSize; - private int _bufferOffset; - - public BufferedCommunicationChannel(IMqttCommunicationChannel inner) - { - _inner = inner; - } - - public Task ConnectAsync(MqttClientOptions options) - { - return _inner.ConnectAsync(options); - } - - public Task DisconnectAsync() - { - return _inner.DisconnectAsync(); - } - - public int Peek() - { - return _inner.Peek(); - } - - public async Task> ReadAsync(int length, byte[] buffer) - { - //read from buffer - if (_bufferSize > 0) - { - return ReadFomBuffer(length, buffer); - } - - var available = _inner.Peek(); - // if there are less or equal bytes available then requested then just read em - if (available <= length) - { - return await _inner.ReadAsync(length, buffer); - } - - //if more bytes are available than requested do buffer them to reduce calls to network buffers - await WriteToBuffer(available, buffer).ConfigureAwait(false); - return ReadFomBuffer(length, buffer); - } - - private async Task WriteToBuffer(int available, byte[] buffer) - { - await _inner.ReadAsync(available, buffer).ConfigureAwait(false); - _bufferSize = available; - _bufferOffset = 0; - } - - private ArraySegment ReadFomBuffer(int length, byte[] buffer) - { - var result = new ArraySegment(buffer, _bufferOffset, length); - _bufferSize -= length; - _bufferOffset += length; - - if (_bufferSize < 0) - { - } - return result; - } - - public Task WriteAsync(byte[] buffer) - { - return _inner.WriteAsync(buffer); - } - } -} diff --git a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs index 0f6ea4b..6fb72f3 100644 --- a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs +++ b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs @@ -1,6 +1,6 @@ using System.Threading.Tasks; using MQTTnet.Core.Client; -using System; +using System.IO; namespace MQTTnet.Core.Channel { @@ -9,14 +9,7 @@ namespace MQTTnet.Core.Channel Task ConnectAsync(MqttClientOptions options); Task DisconnectAsync(); - - Task WriteAsync(byte[] buffer); - - /// - /// get the currently available number of bytes without reading them - /// - int Peek(); - - Task> ReadAsync(int length, byte[] buffer); + + Stream Stream { get; } } } diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index 480b4c7..797bb85 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -5,6 +5,7 @@ MQTTnet.Core MQTTnet.Core False + Full diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index 38d3c7c..ea479c2 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -49,13 +49,13 @@ namespace MQTTnet.Core.Serializer return ReadBytes(_header.BodyLength - (int)BaseStream.Position); } - public static async Task ReadHeaderFromSourceAsync(IMqttCommunicationChannel source, byte[] buffer) + public static MqttPacketHeader ReadHeaderFromSource(IMqttCommunicationChannel source) { - var fixedHeader = await ReadStreamByteAsync(source, buffer).ConfigureAwait(false); + var fixedHeader = (byte)source.Stream.ReadByte(); var byteReader = new ByteReader(fixedHeader); byteReader.Read(4); var controlPacketType = (MqttControlPacketType)byteReader.Read(4); - var bodyLength = await ReadBodyLengthFromSourceAsync(source, buffer).ConfigureAwait(false); + var bodyLength = ReadBodyLengthFromSource(source); return new MqttPacketHeader() { @@ -64,26 +64,8 @@ namespace MQTTnet.Core.Serializer BodyLength = bodyLength }; } - - private static async Task ReadStreamByteAsync(IMqttCommunicationChannel source, byte[] readBuffer) - { - var result = await ReadFromSourceAsync(source, 1, readBuffer).ConfigureAwait(false); - return result.Array[result.Offset]; - } - - public static async Task> ReadFromSourceAsync(IMqttCommunicationChannel source, int length, byte[] buffer) - { - try - { - return await source.ReadAsync(length, buffer); - } - catch (Exception exception) - { - throw new MqttCommunicationException(exception); - } - } - private static async Task ReadBodyLengthFromSourceAsync(IMqttCommunicationChannel source, byte[] buffer) + private static int ReadBodyLengthFromSource(IMqttCommunicationChannel source) { // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. var multiplier = 1; @@ -91,7 +73,7 @@ namespace MQTTnet.Core.Serializer byte encodedByte; do { - encodedByte = await ReadStreamByteAsync(source, buffer).ConfigureAwait(false); + encodedByte = (byte)source.Stream.ReadByte(); value += (encodedByte & 127) * multiplier; multiplier *= 128; if (multiplier > 128 * 128 * 128) diff --git a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs index 064b79b..6f2c9ff 100644 --- a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs @@ -17,7 +17,7 @@ namespace MQTTnet.Core.Serializer private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs"); public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; - private byte[] _readBuffer = new byte[BufferConstants.Size]; + private Task _sendTask = Task.FromResult( 0 ); public async Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination) { @@ -32,12 +32,21 @@ namespace MQTTnet.Core.Serializer var body = stream.ToArray(); MqttPacketWriter.BuildLengthHeader(body.Length, header); - - await destination.WriteAsync(header.ToArray()).ConfigureAwait(false); - await destination.WriteAsync(body).ConfigureAwait(false); + var headerArray = header.ToArray(); + var writeBuffer = new byte[header.Count + body.Length]; + Buffer.BlockCopy( headerArray, 0, writeBuffer, 0, headerArray.Length ); + Buffer.BlockCopy( body, 0, writeBuffer, headerArray.Length, body.Length ); + + _sendTask = Send( writeBuffer, destination ); } } + private async Task Send(byte[] buffer, IMqttCommunicationChannel destination ) + { + await _sendTask.ConfigureAwait( false ); + await destination.Stream.WriteAsync( buffer, 0, buffer.Length ).ConfigureAwait( false ); + } + private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter writer) { if (packet is MqttConnectPacket connectPacket) @@ -116,117 +125,127 @@ namespace MQTTnet.Core.Serializer public async Task DeserializeAsync(IMqttCommunicationChannel source) { if (source == null) throw new ArgumentNullException(nameof(source)); + + var header = MqttPacketReader.ReadHeaderFromSource(source); + + MemoryStream body = null; + if ( header.BodyLength > 0 ) + { + var totalRead = 0; + var readBuffer = new byte[header.BodyLength]; + do + { + var read = await source.Stream.ReadAsync( readBuffer, totalRead, header.BodyLength - totalRead ).ConfigureAwait(false); + totalRead += read; + } while ( totalRead < header.BodyLength ); + body = new MemoryStream( readBuffer, 0, header.BodyLength ); + } + else + { + body = new MemoryStream(); + } - var header = await MqttPacketReader.ReadHeaderFromSourceAsync(source, _readBuffer).ConfigureAwait(false); - - var body = await GetBody(source, header).ConfigureAwait(false); - - using (var mqttPacketReader = new MqttPacketReader(body, header)) + using (var reader = new MqttPacketReader(body, header)) { - switch (header.ControlPacketType) - { - case MqttControlPacketType.Connect: - { - return DeserializeConnect(mqttPacketReader); - } - - case MqttControlPacketType.ConnAck: - { - return DeserializeConnAck(mqttPacketReader); - } - - case MqttControlPacketType.Disconnect: - { - return new MqttDisconnectPacket(); - } - - case MqttControlPacketType.Publish: - { - return DeserializePublish(mqttPacketReader, header); - } - - case MqttControlPacketType.PubAck: - { - return new MqttPubAckPacket - { - PacketIdentifier = mqttPacketReader.ReadUInt16() - }; - } - - case MqttControlPacketType.PubRec: - { - return new MqttPubRecPacket - { - PacketIdentifier = mqttPacketReader.ReadUInt16() - }; - } - - case MqttControlPacketType.PubRel: - { - return new MqttPubRelPacket - { - PacketIdentifier = mqttPacketReader.ReadUInt16() - }; - } - - case MqttControlPacketType.PubComp: - { - return new MqttPubCompPacket - { - PacketIdentifier = mqttPacketReader.ReadUInt16() - }; - } - - case MqttControlPacketType.PingReq: - { - return new MqttPingReqPacket(); - } - - case MqttControlPacketType.PingResp: - { - return new MqttPingRespPacket(); - } - - case MqttControlPacketType.Subscribe: - { - return DeserializeSubscribe(mqttPacketReader); - } - - case MqttControlPacketType.SubAck: - { - return DeserializeSubAck(mqttPacketReader); - } - - case MqttControlPacketType.Unsubscibe: - { - return DeserializeUnsubscribe(mqttPacketReader); - } - - case MqttControlPacketType.UnsubAck: - { - return new MqttUnsubAckPacket - { - PacketIdentifier = mqttPacketReader.ReadUInt16() - }; - } - - default: - { - throw new MqttProtocolViolationException($"Packet type ({(int)header.ControlPacketType}) not supported."); - } - } + return Deserialize( header, reader ); } } - private async Task GetBody(IMqttCommunicationChannel source, MqttPacketHeader header) + private static MqttBasePacket Deserialize(MqttPacketHeader header, MqttPacketReader reader) { - if (header.BodyLength > 0) + switch (header.ControlPacketType) { - var segment = await MqttPacketReader.ReadFromSourceAsync(source, header.BodyLength, _readBuffer).ConfigureAwait(false); - return new MemoryStream(segment.Array, segment.Offset, segment.Count); - } + case MqttControlPacketType.Connect: + { + return DeserializeConnect( reader ); + } + + case MqttControlPacketType.ConnAck: + { + return DeserializeConnAck( reader ); + } + + case MqttControlPacketType.Disconnect: + { + return new MqttDisconnectPacket(); + } + + case MqttControlPacketType.Publish: + { + return DeserializePublish( reader, header ); + } + + case MqttControlPacketType.PubAck: + { + return new MqttPubAckPacket + { + PacketIdentifier = reader.ReadUInt16() + }; + } + + case MqttControlPacketType.PubRec: + { + return new MqttPubRecPacket + { + PacketIdentifier = reader.ReadUInt16() + }; + } - return new MemoryStream(); + case MqttControlPacketType.PubRel: + { + return new MqttPubRelPacket + { + PacketIdentifier = reader.ReadUInt16() + }; + } + + case MqttControlPacketType.PubComp: + { + return new MqttPubCompPacket + { + PacketIdentifier = reader.ReadUInt16() + }; + } + + case MqttControlPacketType.PingReq: + { + return new MqttPingReqPacket(); + } + + case MqttControlPacketType.PingResp: + { + return new MqttPingRespPacket(); + } + + case MqttControlPacketType.Subscribe: + { + return DeserializeSubscribe( reader ); + } + + case MqttControlPacketType.SubAck: + { + return DeserializeSubAck( reader ); + } + + case MqttControlPacketType.Unsubscibe: + { + return DeserializeUnsubscribe( reader ); + } + + case MqttControlPacketType.UnsubAck: + { + return new MqttUnsubAckPacket + { + PacketIdentifier = reader.ReadUInt16() + }; + } + + default: + { + throw new MqttProtocolViolationException( + $"Packet type ({(int) header.ControlPacketType}) not supported." ); + } + } } private static MqttBasePacket DeserializeUnsubscribe(MqttPacketReader reader) @@ -268,7 +287,13 @@ namespace MQTTnet.Core.Serializer var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2); var dup = fixedHeader.Read(); - var topic = reader.ReadStringWithLengthPrefix(); + + var length = reader.ReadUInt16(); + if (length != 5) + { + + } + var topic = Encoding.UTF8.GetString( reader.ReadBytes( length ), 0, length ); ushort packetIdentifier = 0; if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index 6a18f50..4f33be3 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -391,6 +391,8 @@ namespace MQTTnet.Core.Tests { private readonly MemoryStream _stream = new MemoryStream(); + public Stream Stream => _stream; + public bool IsConnected { get; } = true; public TestChannel() @@ -413,26 +415,10 @@ namespace MQTTnet.Core.Tests return Task.FromResult(0); } - public Task WriteAsync(byte[] buffer) - { - return _stream.WriteAsync(buffer, 0, buffer.Length); - } - - public async Task> ReadAsync(int length, byte[] buffer) - { - await _stream.ReadAsync(buffer, 0, length); - return new ArraySegment(buffer, 0, length); - } - public byte[] ToArray() { return _stream.ToArray(); } - - public int Peek() - { - return (int)_stream.Length - (int)_stream.Position; - } } private static void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) From 01e739d5248facaace591ca61939db92002b1252 Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Mon, 11 Sep 2017 12:51:18 +0200 Subject: [PATCH 02/14] generate full debug symbols --- MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj | 1 + 1 file changed, 1 insertion(+) diff --git a/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj b/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj index 7b69707..7bbbb86 100644 --- a/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj +++ b/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj @@ -2,6 +2,7 @@ Exe + Full netcoreapp2.0 From 8fd3e5070dd3c95148c7e22843300d594da87f89 Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Mon, 11 Sep 2017 13:01:24 +0200 Subject: [PATCH 03/14] add missing await --- MQTTnet.Core/Serializer/MqttPacketSerializer.cs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs index 6f2c9ff..78c3cc0 100644 --- a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs @@ -38,6 +38,7 @@ namespace MQTTnet.Core.Serializer Buffer.BlockCopy( body, 0, writeBuffer, headerArray.Length, body.Length ); _sendTask = Send( writeBuffer, destination ); + await _sendTask.ConfigureAwait( false ); } } @@ -286,14 +287,8 @@ namespace MQTTnet.Core.Serializer var retain = fixedHeader.Read(); var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2); var dup = fixedHeader.Read(); - - - var length = reader.ReadUInt16(); - if (length != 5) - { - - } - var topic = Encoding.UTF8.GetString( reader.ReadBytes( length ), 0, length ); + + var topic = reader.ReadStringWithLengthPrefix(); ushort packetIdentifier = 0; if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) From c001897590e8a6acec52c9cbb74ef8749d3d5311 Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Mon, 11 Sep 2017 13:34:06 +0200 Subject: [PATCH 04/14] remove async and framing from serializer --- .../MqttChannelCommunicationAdapter.cs | 46 +++++++++++++++++-- .../Serializer/IMqttPacketSerializer.cs | 7 ++- MQTTnet.Core/Serializer/MqttPacketReader.cs | 12 ++--- .../Serializer/MqttPacketSerializer.cs | 37 ++------------- .../MqttPacketSerializerTests.cs | 26 +++++------ 5 files changed, 69 insertions(+), 59 deletions(-) diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index d0f1c6e..e42b5e5 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -1,4 +1,5 @@ using System; +using System.IO; using System.Threading.Tasks; using MQTTnet.Core.Channel; using MQTTnet.Core.Client; @@ -35,21 +36,33 @@ namespace MQTTnet.Core.Adapter { MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "TX >>> {0} [Timeout={1}]", packet, timeout); - return ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout); + var writeBuffer = PacketSerializer.Serialize(packet); + _sendTask = SendAsync( writeBuffer ); + return ExecuteWithTimeoutAsync(_sendTask, timeout); + } + + private Task _sendTask = Task.FromResult(0); // this task is used to prevent overlapping write + + private async Task SendAsync(byte[] buffer) + { + await _sendTask.ConfigureAwait(false); + await _channel.Stream.WriteAsync(buffer, 0, buffer.Length).ConfigureAwait( false ); } public async Task ReceivePacketAsync(TimeSpan timeout) { - MqttBasePacket packet; + Tuple tuple; if (timeout > TimeSpan.Zero) { - packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout).ConfigureAwait(false); + tuple = await ExecuteWithTimeoutAsync(ReceiveAsync(), timeout).ConfigureAwait(false); } else { - packet = await PacketSerializer.DeserializeAsync(_channel).ConfigureAwait(false); + tuple = await ReceiveAsync().ConfigureAwait(false); } + var packet = PacketSerializer.Deserialize(tuple.Item1, tuple.Item2); + if (packet == null) { throw new MqttProtocolViolationException("Received malformed packet."); @@ -59,6 +72,31 @@ namespace MQTTnet.Core.Adapter return packet; } + private async Task> ReceiveAsync() + { + var header = MqttPacketReader.ReadHeaderFromSource(_channel.Stream); + + MemoryStream body = null; + if (header.BodyLength > 0) + { + var totalRead = 0; + var readBuffer = new byte[header.BodyLength]; + do + { + var read = await _channel.Stream.ReadAsync(readBuffer, totalRead, header.BodyLength - totalRead) + .ConfigureAwait( false ); + totalRead += read; + } while (totalRead < header.BodyLength); + body = new MemoryStream(readBuffer, 0, header.BodyLength); + } + else + { + body = new MemoryStream(); + } + + return Tuple.Create(header, body); + } + private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) { var timeoutTask = Task.Delay(timeout); diff --git a/MQTTnet.Core/Serializer/IMqttPacketSerializer.cs b/MQTTnet.Core/Serializer/IMqttPacketSerializer.cs index 801b7ea..df5b045 100644 --- a/MQTTnet.Core/Serializer/IMqttPacketSerializer.cs +++ b/MQTTnet.Core/Serializer/IMqttPacketSerializer.cs @@ -1,5 +1,4 @@ -using System.Threading.Tasks; -using MQTTnet.Core.Channel; +using System.IO; using MQTTnet.Core.Packets; namespace MQTTnet.Core.Serializer @@ -8,8 +7,8 @@ namespace MQTTnet.Core.Serializer { MqttProtocolVersion ProtocolVersion { get; set; } - Task SerializeAsync(MqttBasePacket mqttPacket, IMqttCommunicationChannel destination); + byte[] Serialize(MqttBasePacket mqttPacket); - Task DeserializeAsync(IMqttCommunicationChannel source); + MqttBasePacket Deserialize(MqttPacketHeader header, MemoryStream stream); } } \ No newline at end of file diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index ea479c2..1065078 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -14,7 +14,7 @@ namespace MQTTnet.Core.Serializer private readonly MqttPacketHeader _header; public MqttPacketReader(Stream stream, MqttPacketHeader header) - : base(stream) + : base(stream, Encoding.UTF8, true) { _header = header; } @@ -49,13 +49,13 @@ namespace MQTTnet.Core.Serializer return ReadBytes(_header.BodyLength - (int)BaseStream.Position); } - public static MqttPacketHeader ReadHeaderFromSource(IMqttCommunicationChannel source) + public static MqttPacketHeader ReadHeaderFromSource(Stream stream) { - var fixedHeader = (byte)source.Stream.ReadByte(); + var fixedHeader = (byte)stream.ReadByte(); var byteReader = new ByteReader(fixedHeader); byteReader.Read(4); var controlPacketType = (MqttControlPacketType)byteReader.Read(4); - var bodyLength = ReadBodyLengthFromSource(source); + var bodyLength = ReadBodyLengthFromSource(stream); return new MqttPacketHeader() { @@ -65,7 +65,7 @@ namespace MQTTnet.Core.Serializer }; } - private static int ReadBodyLengthFromSource(IMqttCommunicationChannel source) + private static int ReadBodyLengthFromSource(Stream stream) { // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. var multiplier = 1; @@ -73,7 +73,7 @@ namespace MQTTnet.Core.Serializer byte encodedByte; do { - encodedByte = (byte)source.Stream.ReadByte(); + encodedByte = (byte)stream.ReadByte(); value += (encodedByte & 127) * multiplier; multiplier *= 128; if (multiplier > 128 * 128 * 128) diff --git a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs index 78c3cc0..fa373f0 100644 --- a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs @@ -17,12 +17,10 @@ namespace MQTTnet.Core.Serializer private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs"); public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; - private Task _sendTask = Task.FromResult( 0 ); - public async Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination) + public byte[] Serialize(MqttBasePacket packet) { if (packet == null) throw new ArgumentNullException(nameof(packet)); - if (destination == null) throw new ArgumentNullException(nameof(destination)); using (var stream = new MemoryStream()) using (var writer = new MqttPacketWriter(stream)) @@ -37,17 +35,10 @@ namespace MQTTnet.Core.Serializer Buffer.BlockCopy( headerArray, 0, writeBuffer, 0, headerArray.Length ); Buffer.BlockCopy( body, 0, writeBuffer, headerArray.Length, body.Length ); - _sendTask = Send( writeBuffer, destination ); - await _sendTask.ConfigureAwait( false ); + return writeBuffer; } } - private async Task Send(byte[] buffer, IMqttCommunicationChannel destination ) - { - await _sendTask.ConfigureAwait( false ); - await destination.Stream.WriteAsync( buffer, 0, buffer.Length ).ConfigureAwait( false ); - } - private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter writer) { if (packet is MqttConnectPacket connectPacket) @@ -123,29 +114,11 @@ namespace MQTTnet.Core.Serializer throw new MqttProtocolViolationException("Packet type invalid."); } - public async Task DeserializeAsync(IMqttCommunicationChannel source) + public MqttBasePacket Deserialize(MqttPacketHeader header, MemoryStream body) { - if (source == null) throw new ArgumentNullException(nameof(source)); + if (header == null) throw new ArgumentNullException(nameof(header)); + if (body == null) throw new ArgumentNullException(nameof(body)); - var header = MqttPacketReader.ReadHeaderFromSource(source); - - MemoryStream body = null; - if ( header.BodyLength > 0 ) - { - var totalRead = 0; - var readBuffer = new byte[header.BodyLength]; - do - { - var read = await source.Stream.ReadAsync( readBuffer, totalRead, header.BodyLength - totalRead ).ConfigureAwait(false); - totalRead += read; - } while ( totalRead < header.BodyLength ); - body = new MemoryStream( readBuffer, 0, header.BodyLength ); - } - else - { - body = new MemoryStream(); - } - using (var reader = new MqttPacketReader(body, header)) { return Deserialize( header, reader ); diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index 4f33be3..bf0d6f5 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -424,9 +424,7 @@ namespace MQTTnet.Core.Tests private static void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) { var serializer = new MqttPacketSerializer { ProtocolVersion = protocolVersion }; - var channel = new TestChannel(); - serializer.SerializeAsync(packet, channel).Wait(); - var buffer = channel.ToArray(); + var buffer = serializer.Serialize(packet); Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(buffer)); } @@ -434,19 +432,21 @@ namespace MQTTnet.Core.Tests private static void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value) { var serializer = new MqttPacketSerializer(); + + var buffer1 = serializer.Serialize(packet); - var channel1 = new TestChannel(); - serializer.SerializeAsync(packet, channel1).Wait(); - var buffer1 = channel1.ToArray(); - - var channel2 = new TestChannel(buffer1); - var deserializedPacket = serializer.DeserializeAsync(channel2).Result; - var buffer2 = channel2.ToArray(); + using (var headerStream = new MemoryStream( buffer1 )) + { + var header = MqttPacketReader.ReadHeaderFromSource( headerStream ); - var channel3 = new TestChannel(buffer2); - serializer.SerializeAsync(deserializedPacket, channel3).Wait(); + using (var bodyStream = new MemoryStream( buffer1, (int)headerStream.Position, header.BodyLength )) + { + var deserializedPacket = serializer.Deserialize(header, bodyStream); + var buffer2 = serializer.Serialize( deserializedPacket ); - Assert.AreEqual(expectedBase64Value, Convert.ToBase64String(channel3.ToArray())); + Assert.AreEqual( expectedBase64Value, Convert.ToBase64String( buffer2 ) ); + } + } } } } From 323339b0b2b4dad64728b9349fafb7fa6a29d333 Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Mon, 11 Sep 2017 14:14:56 +0200 Subject: [PATCH 05/14] dont use byte reader writer in critical paths --- MQTTnet.Core/Serializer/MqttPacketReader.cs | 4 +--- .../Serializer/MqttPacketSerializer.cs | 19 ++++++++++++++----- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index 1065078..f07dcd7 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -52,9 +52,7 @@ namespace MQTTnet.Core.Serializer public static MqttPacketHeader ReadHeaderFromSource(Stream stream) { var fixedHeader = (byte)stream.ReadByte(); - var byteReader = new ByteReader(fixedHeader); - byteReader.Read(4); - var controlPacketType = (MqttControlPacketType)byteReader.Read(4); + var controlPacketType = (MqttControlPacketType)(fixedHeader >> 4); var bodyLength = ReadBodyLengthFromSource(stream); return new MqttPacketHeader() diff --git a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs index fa373f0..59b6ba9 100644 --- a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs @@ -512,12 +512,21 @@ namespace MQTTnet.Core.Serializer writer.Write(packet.Payload); } - var fixedHeader = new ByteWriter(); - fixedHeader.Write(packet.Retain); - fixedHeader.Write((byte)packet.QualityOfServiceLevel, 2); - fixedHeader.Write(packet.Dup); + byte fixedHeader = 0; - return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader.Value); + if (packet.Retain) + { + fixedHeader |= 0x01; + } + + fixedHeader |= (byte)((byte)packet.QualityOfServiceLevel << 1); + + if ( packet.Dup ) + { + fixedHeader |= 0x08; + } + + return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader); } private static byte Serialize(MqttPubAckPacket packet, MqttPacketWriter writer) From 5712bc11d28fa6d825a5a03a61f26e7e672d26ee Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Mon, 11 Sep 2017 17:23:41 +0200 Subject: [PATCH 06/14] increased concurrency to force raceconditions --- .../PerformanceTest.cs | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs index 32750d1..6175f32 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace MQTTnet.TestApp.NetFramework @@ -16,7 +17,7 @@ namespace MQTTnet.TestApp.NetFramework public static async Task RunAsync() { var server = Task.Run(() => RunServerAsync()); - var client = Task.Run(() => RunClientAsync(50, TimeSpan.FromMilliseconds(10))); + var client = Task.Run(() => RunClientAsync(500, TimeSpan.FromMilliseconds(10))); await Task.WhenAll(server, client).ConfigureAwait(false); } @@ -81,19 +82,11 @@ namespace MQTTnet.TestApp.NetFramework while (true) { - for (int i = 0; i < msgChunkSize; i++) - { - var applicationMessage = new MqttApplicationMessage( - "A/B/C", - Encoding.UTF8.GetBytes("Hello World"), - MqttQualityOfServiceLevel.AtLeastOnce, - false - ); - - //do not await to send as much messages as possible - await client.PublishAsync(applicationMessage); - msgs++; - } + var sendTasks = Enumerable.Range( 0, msgChunkSize ) + .Select( i => PublishSingleMessage( client, ref msgs ) ) + .ToList(); + + await Task.WhenAll( sendTasks ); var now = DateTime.Now; if (last < now - TimeSpan.FromSeconds(1)) @@ -112,6 +105,23 @@ namespace MQTTnet.TestApp.NetFramework } } + private static Task PublishSingleMessage( IMqttClient client, ref int count ) + { + Interlocked.Increment( ref count ); + return Task.Run( () => + { + var applicationMessage = new MqttApplicationMessage( + "A/B/C", + Encoding.UTF8.GetBytes( "Hello World" ), + MqttQualityOfServiceLevel.AtLeastOnce, + false + ); + + //do not await to send as much messages as possible + return client.PublishAsync( applicationMessage ); + } ); + } + private static void RunServerAsync() { try From 090e59e99afdf1e278f0cab4a71639a98a2a5a8b Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Mon, 11 Sep 2017 17:50:50 +0200 Subject: [PATCH 07/14] unfifed timeout handling and fixed memory usage due to Task.Delay tasks for each send package are present for the duration of the timeout resulting in memory usage. new approach uses cancellationtoken that will be cleaned up directly if operation completes before timeout --- .../MqttChannelCommunicationAdapter.cs | 38 ++----------- MQTTnet.Core/Client/MqttPacketDispatcher.cs | 20 ++++--- MQTTnet.Core/Internal/TaskExtensions.cs | 55 +++++++++++++++++++ Tests/MQTTnet.Core.Tests/ExtensionTests.cs | 33 +++++++++++ .../MQTTnet.Core.Tests.csproj | 1 + 5 files changed, 106 insertions(+), 41 deletions(-) create mode 100644 MQTTnet.Core/Internal/TaskExtensions.cs create mode 100644 Tests/MQTTnet.Core.Tests/ExtensionTests.cs diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index e42b5e5..63740b2 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -1,10 +1,12 @@ using System; using System.IO; +using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Channel; using MQTTnet.Core.Client; using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using MQTTnet.Core.Serializer; @@ -24,7 +26,7 @@ namespace MQTTnet.Core.Adapter public Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) { - return ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout); + return _channel.ConnectAsync(options).TimeoutAfter(timeout); } public Task DisconnectAsync() @@ -38,7 +40,7 @@ namespace MQTTnet.Core.Adapter var writeBuffer = PacketSerializer.Serialize(packet); _sendTask = SendAsync( writeBuffer ); - return ExecuteWithTimeoutAsync(_sendTask, timeout); + return _sendTask.TimeoutAfter(timeout); } private Task _sendTask = Task.FromResult(0); // this task is used to prevent overlapping write @@ -54,7 +56,7 @@ namespace MQTTnet.Core.Adapter Tuple tuple; if (timeout > TimeSpan.Zero) { - tuple = await ExecuteWithTimeoutAsync(ReceiveAsync(), timeout).ConfigureAwait(false); + tuple = await ReceiveAsync().TimeoutAfter(timeout).ConfigureAwait(false); } else { @@ -96,35 +98,5 @@ namespace MQTTnet.Core.Adapter return Tuple.Create(header, body); } - - private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) - { - var timeoutTask = Task.Delay(timeout); - if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask) - { - throw new MqttCommunicationTimedOutException(); - } - - if (task.IsFaulted) - { - throw new MqttCommunicationException(task.Exception); - } - - return task.Result; - } - - private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) - { - var timeoutTask = Task.Delay(timeout); - if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask) - { - throw new MqttCommunicationTimedOutException(); - } - - if (task.IsFaulted) - { - throw new MqttCommunicationException(task.Exception); - } - } } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttPacketDispatcher.cs b/MQTTnet.Core/Client/MqttPacketDispatcher.cs index f057b97..6d369b3 100644 --- a/MQTTnet.Core/Client/MqttPacketDispatcher.cs +++ b/MQTTnet.Core/Client/MqttPacketDispatcher.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using System.Collections.Concurrent; @@ -22,16 +23,19 @@ namespace MQTTnet.Core.Client var packetAwaiter = AddPacketAwaiter(request, responseType); DispatchPendingPackets(); - var hasTimeout = await Task.WhenAny(Task.Delay(timeout), packetAwaiter.Task).ConfigureAwait(false) != packetAwaiter.Task; - RemovePacketAwaiter(request, responseType); - - if (hasTimeout) + try { - MqttTrace.Warning(nameof(MqttPacketDispatcher), "Timeout while waiting for packet."); - throw new MqttCommunicationTimedOutException(); + return await packetAwaiter.Task.TimeoutAfter( timeout ); + } + catch ( MqttCommunicationTimedOutException ) + { + MqttTrace.Warning( nameof( MqttPacketDispatcher ), "Timeout while waiting for packet." ); + throw; + } + finally + { + RemovePacketAwaiter(request, responseType); } - - return packetAwaiter.Task.Result; } public void Dispatch(MqttBasePacket packet) diff --git a/MQTTnet.Core/Internal/TaskExtensions.cs b/MQTTnet.Core/Internal/TaskExtensions.cs new file mode 100644 index 0000000..eb8f485 --- /dev/null +++ b/MQTTnet.Core/Internal/TaskExtensions.cs @@ -0,0 +1,55 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Core.Exceptions; + +namespace MQTTnet.Core.Internal +{ + public static class TaskExtensions + { + public static Task TimeoutAfter( this Task task, TimeSpan timeout ) + { + return TimeoutAfter( task.ContinueWith( t => 0 ), timeout ); + } + + public static async Task TimeoutAfter(this Task task, TimeSpan timeout) + { + using (var cancellationTokenSource = new CancellationTokenSource()) + { + var tcs = new TaskCompletionSource(); + + cancellationTokenSource.Token.Register(() => + { + tcs.TrySetCanceled(); + } ); + + try + { + cancellationTokenSource.CancelAfter(timeout); + task.ContinueWith( t => + { + if (t.IsFaulted) + { + tcs.TrySetException(t.Exception); + } + + if (t.IsCompleted) + { + tcs.TrySetResult(t.Result); + } + }, cancellationTokenSource.Token ); + + return await tcs.Task; + } + catch (TaskCanceledException) + { + throw new MqttCommunicationTimedOutException(); + } + catch (Exception e) + { + throw new MqttCommunicationException(e); + } + } + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/ExtensionTests.cs b/Tests/MQTTnet.Core.Tests/ExtensionTests.cs new file mode 100644 index 0000000..1a3cd29 --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/ExtensionTests.cs @@ -0,0 +1,33 @@ +using System; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Internal; + +namespace MQTTnet.Core.Tests +{ + [TestClass] + public class ExtensionTests + { + [ExpectedException(typeof( MqttCommunicationTimedOutException ) )] + [TestMethod] + public async Task TestTimeoutAfter() + { + await Task.Delay(TimeSpan.FromMilliseconds(500)).TimeoutAfter(TimeSpan.FromMilliseconds(100)); + } + + [ExpectedException(typeof( MqttCommunicationTimedOutException))] + [TestMethod] + public async Task TestTimeoutAfterWithResult() + { + await Task.Delay(TimeSpan.FromMilliseconds(500)).ContinueWith(t => 5).TimeoutAfter(TimeSpan.FromMilliseconds(100)); + } + + [TestMethod] + public async Task TestTimeoutAfterCompleteInTime() + { + var result = await Task.Delay( TimeSpan.FromMilliseconds( 100 ) ).ContinueWith( t => 5 ).TimeoutAfter( TimeSpan.FromMilliseconds( 500 ) ); + Assert.AreEqual( 5, result ); + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj index 88bf8f2..2e20398 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj +++ b/Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj @@ -86,6 +86,7 @@ + From 473c8e0a15a499cc47a246838d397b70992d66e5 Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Tue, 12 Sep 2017 10:11:21 +0200 Subject: [PATCH 08/14] send multiple packages at once msg/sec from 9000 to 32000 --- .../Implementations/MqttTcpChannel.cs | 22 ++++--- .../Implementations/MqttWebSocketChannel.cs | 8 ++- .../Implementations/MqttTcpChannel.cs | 3 +- .../Implementations/MqttWebSocketChannel.cs | 9 +-- .../Adapter/IMqttCommunicationAdapter.cs | 11 +++- .../MqttChannelCommunicationAdapter.cs | 22 ++++--- .../Channel/IMqttCommunicationChannel.cs | 4 +- MQTTnet.Core/Client/IMqttClient.cs | 10 +++- MQTTnet.Core/Client/MqttClient.cs | 58 +++++++++++-------- MQTTnet.Core/Server/MqttClientMessageQueue.cs | 2 +- MQTTnet.Core/Server/MqttClientSession.cs | 14 ++--- .../Server/MqttClientSessionsManager.cs | 8 +-- .../MqttPacketSerializerTests.cs | 4 +- .../TestMqttCommunicationAdapter.cs | 9 ++- .../PerformanceTest.cs | 48 ++++++++++----- 15 files changed, 151 insertions(+), 81 deletions(-) diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index dede268..a1ee8c5 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -13,11 +13,13 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { - private Stream _dataStream; + private Stream _receiveStream; + private Stream _sendStream; private Socket _socket; private SslStream _sslStream; - public Stream Stream => _dataStream; + public Stream ReceiveStream => _receiveStream; + public Stream SendStream => _sendStream; /// /// called on client sockets are created in connect @@ -35,7 +37,7 @@ namespace MQTTnet.Implementations { _socket = socket ?? throw new ArgumentNullException(nameof(socket)); _sslStream = sslStream; - _dataStream = (Stream)sslStream ?? new NetworkStream(socket); + CreateCommStreams( socket, sslStream ); } public async Task ConnectAsync(MqttClientOptions options) @@ -54,13 +56,10 @@ namespace MQTTnet.Implementations { _sslStream = new SslStream(new NetworkStream(_socket, true)); - _dataStream = _sslStream; await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false); } - else - { - _dataStream = new NetworkStream(_socket); - } + + CreateCommStreams( _socket, _sslStream ); } catch (SocketException exception) { @@ -90,6 +89,13 @@ namespace MQTTnet.Implementations _sslStream = null; } + private void CreateCommStreams( Socket socket, SslStream sslStream ) + { + //cannot use this as default buffering prevents from receiving the first connect message + _receiveStream = (Stream)sslStream ?? new NetworkStream( socket ); + _sendStream = new BufferedStream( _receiveStream, BufferConstants.Size ); + } + private static X509CertificateCollection LoadCertificates(MqttClientOptions options) { var certificates = new X509CertificateCollection(); diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs index c3aed02..02cc32e 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs @@ -13,7 +13,9 @@ namespace MQTTnet.Implementations { private ClientWebSocket _webSocket = new ClientWebSocket(); - public Stream Stream { get; private set; } + public Stream ReceiveStream { get; private set; } + + public Stream SendStream { get; private set; } public async Task ConnectAsync(MqttClientOptions options) { @@ -24,7 +26,7 @@ namespace MQTTnet.Implementations _webSocket = new ClientWebSocket(); await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); - Stream = new WebSocketStream(_webSocket); + ReceiveStream = SendStream = new WebSocketStream(_webSocket); } catch (WebSocketException exception) { @@ -34,7 +36,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync() { - Stream = null; + ReceiveStream = null; return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index 817ab44..b8916e0 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -18,7 +18,8 @@ namespace MQTTnet.Implementations private SslStream _sslStream; - public Stream Stream => _dataStream; + public Stream ReceiveStream => _dataStream; + public Stream SendStream => _dataStream; /// /// called on client sockets are created in connect diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index 2e0bdf8..abce128 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -12,8 +12,9 @@ namespace MQTTnet.Implementations public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { private ClientWebSocket _webSocket = new ClientWebSocket(); - - public Stream Stream { get; private set; } + + public Stream SendStream { get; private set; } + public Stream ReceiveStream { get; private set; } public async Task ConnectAsync(MqttClientOptions options) { @@ -24,7 +25,7 @@ namespace MQTTnet.Implementations _webSocket = new ClientWebSocket(); await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); - Stream = new WebSocketStream(_webSocket); + SendStream = ReceiveStream = new WebSocketStream(_webSocket); } catch (WebSocketException exception) { @@ -34,7 +35,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync() { - Stream = null; + SendStream = ReceiveStream = null; return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } diff --git a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs index 77ab898..2aa2594 100644 --- a/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using MQTTnet.Core.Client; using MQTTnet.Core.Packets; @@ -12,10 +13,18 @@ namespace MQTTnet.Core.Adapter Task DisconnectAsync(); - Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout); + Task SendPacketsAsync( TimeSpan timeout, IEnumerable packets ); Task ReceivePacketAsync(TimeSpan timeout); IMqttPacketSerializer PacketSerializer { get; } } + + public static class IMqttCommunicationAdapterExtensions + { + public static Task SendPacketsAsync( this IMqttCommunicationAdapter adapter, TimeSpan timeout, params MqttBasePacket[] packets ) + { + return adapter.SendPacketsAsync( timeout, packets ); + } + } } diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index 63740b2..b8a15fa 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -34,13 +35,18 @@ namespace MQTTnet.Core.Adapter return _channel.DisconnectAsync(); } - public Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) + public async Task SendPacketsAsync( TimeSpan timeout, IEnumerable packets ) { - MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "TX >>> {0} [Timeout={1}]", packet, timeout); + foreach (var packet in packets ) + { + MqttTrace.Information( nameof( MqttChannelCommunicationAdapter ), "TX >>> {0} [Timeout={1}]", packet, timeout ); + + var writeBuffer = PacketSerializer.Serialize(packet); + _sendTask = SendAsync( writeBuffer ); + } - var writeBuffer = PacketSerializer.Serialize(packet); - _sendTask = SendAsync( writeBuffer ); - return _sendTask.TimeoutAfter(timeout); + await _sendTask.ConfigureAwait( false ); + await _channel.SendStream.FlushAsync().TimeoutAfter( timeout ).ConfigureAwait( false ); } private Task _sendTask = Task.FromResult(0); // this task is used to prevent overlapping write @@ -48,7 +54,7 @@ namespace MQTTnet.Core.Adapter private async Task SendAsync(byte[] buffer) { await _sendTask.ConfigureAwait(false); - await _channel.Stream.WriteAsync(buffer, 0, buffer.Length).ConfigureAwait( false ); + await _channel.SendStream.WriteAsync(buffer, 0, buffer.Length).ConfigureAwait( false ); } public async Task ReceivePacketAsync(TimeSpan timeout) @@ -76,7 +82,7 @@ namespace MQTTnet.Core.Adapter private async Task> ReceiveAsync() { - var header = MqttPacketReader.ReadHeaderFromSource(_channel.Stream); + var header = MqttPacketReader.ReadHeaderFromSource(_channel.ReceiveStream); MemoryStream body = null; if (header.BodyLength > 0) @@ -85,7 +91,7 @@ namespace MQTTnet.Core.Adapter var readBuffer = new byte[header.BodyLength]; do { - var read = await _channel.Stream.ReadAsync(readBuffer, totalRead, header.BodyLength - totalRead) + var read = await _channel.ReceiveStream.ReadAsync(readBuffer, totalRead, header.BodyLength - totalRead) .ConfigureAwait( false ); totalRead += read; } while (totalRead < header.BodyLength); diff --git a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs index 6fb72f3..6b0854d 100644 --- a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs +++ b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs @@ -10,6 +10,8 @@ namespace MQTTnet.Core.Channel Task DisconnectAsync(); - Stream Stream { get; } + Stream SendStream { get; } + + Stream ReceiveStream { get; } } } diff --git a/MQTTnet.Core/Client/IMqttClient.cs b/MQTTnet.Core/Client/IMqttClient.cs index 0170adf..8446e25 100644 --- a/MQTTnet.Core/Client/IMqttClient.cs +++ b/MQTTnet.Core/Client/IMqttClient.cs @@ -15,10 +15,18 @@ namespace MQTTnet.Core.Client Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null); Task DisconnectAsync(); - Task PublishAsync(MqttApplicationMessage applicationMessage); + Task PublishAsync(IEnumerable applicationMessages); Task> SubscribeAsync(IList topicFilters); Task> SubscribeAsync(params TopicFilter[] topicFilters); Task Unsubscribe(IList topicFilters); Task Unsubscribe(params string[] topicFilters); } + + public static class IMqttClientExtensions + { + public static Task PublishAsync( this IMqttClient client, params MqttApplicationMessage[] applicationMessages ) + { + return client.PublishAsync( applicationMessages ); + } + } } \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index deb0242..62c6bb3 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -161,32 +161,43 @@ namespace MQTTnet.Core.Client return SendAndReceiveAsync(unsubscribePacket); } - public Task PublishAsync(MqttApplicationMessage applicationMessage) + public async Task PublishAsync(IEnumerable applicationMessages) { - if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); ThrowIfNotConnected(); - var publishPacket = applicationMessage.ToPublishPacket(); + var publishPackets = applicationMessages.Select(m => m.ToPublishPacket()); - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) + foreach (var qosGroup in publishPackets.GroupBy(p => p.QualityOfServiceLevel)) { - // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] - return SendAsync(publishPacket); - } - - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) - { - publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - return SendAndReceiveAsync(publishPacket); - } - - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) - { - publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - return PublishExactlyOncePacketAsync(publishPacket); + var qosPackets = qosGroup.ToArray(); + switch ( qosGroup.Key ) + { + case MqttQualityOfServiceLevel.AtMostOnce: + // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] + await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, qosPackets); + break; + case MqttQualityOfServiceLevel.AtLeastOnce: + { + foreach (var publishPacket in qosPackets) + { + publishPacket.PacketIdentifier = GetNewPacketIdentifier(); + await SendAndReceiveAsync(publishPacket); + } + break; + } + case MqttQualityOfServiceLevel.ExactlyOnce: + { + foreach (var publishPacket in qosPackets) + { + publishPacket.PacketIdentifier = GetNewPacketIdentifier(); + await PublishExactlyOncePacketAsync( publishPacket ); + } + break; + } + default: + throw new InvalidOperationException(); + } } - - throw new InvalidOperationException(); } private async Task PublishExactlyOncePacketAsync(MqttBasePacket publishPacket) @@ -312,14 +323,13 @@ namespace MQTTnet.Core.Client private Task SendAsync(MqttBasePacket packet) { - return _adapter.SendPacketAsync(packet, _options.DefaultCommunicationTimeout); + return _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, packet); } private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket { - await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false); - - return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout).ConfigureAwait(false); + await _adapter.SendPacketsAsync( _options.DefaultCommunicationTimeout, requestPacket ).ConfigureAwait(false); + return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(requestPacket, typeof( TResponsePacket ), _options.DefaultCommunicationTimeout).ConfigureAwait(false); } private ushort GetNewPacketIdentifier() diff --git a/MQTTnet.Core/Server/MqttClientMessageQueue.cs b/MQTTnet.Core/Server/MqttClientMessageQueue.cs index 22fbdbb..ccf779d 100644 --- a/MQTTnet.Core/Server/MqttClientMessageQueue.cs +++ b/MQTTnet.Core/Server/MqttClientMessageQueue.cs @@ -105,7 +105,7 @@ namespace MQTTnet.Core.Server } publishPacketContext.PublishPacket.Dup = publishPacketContext.SendTries > 0; - await _adapter.SendPacketAsync(publishPacketContext.PublishPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false); + await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, publishPacketContext.PublishPacket).ConfigureAwait(false); publishPacketContext.IsSent = true; } diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 9b811ec..1411dd3 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -103,12 +103,12 @@ namespace MQTTnet.Core.Server { if (packet is MqttSubscribePacket subscribePacket) { - return Adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _subscriptionsManager.Subscribe(subscribePacket)); } if (packet is MqttUnsubscribePacket unsubscribePacket) { - return Adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _subscriptionsManager.Unsubscribe(unsubscribePacket)); } if (packet is MqttPublishPacket publishPacket) @@ -123,7 +123,7 @@ namespace MQTTnet.Core.Server if (packet is MqttPubRecPacket pubRecPacket) { - return Adapter.SendPacketAsync(pubRecPacket.CreateResponse(), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, pubRecPacket.CreateResponse()); } if (packet is MqttPubAckPacket || packet is MqttPubCompPacket) @@ -134,7 +134,7 @@ namespace MQTTnet.Core.Server if (packet is MqttPingReqPacket) { - return Adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttPingRespPacket()); } if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) @@ -160,7 +160,7 @@ namespace MQTTnet.Core.Server if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { _publishPacketReceivedCallback(this, publishPacket); - return Adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); } if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) @@ -173,7 +173,7 @@ namespace MQTTnet.Core.Server _publishPacketReceivedCallback(this, publishPacket); - return Adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); } throw new MqttCommunicationException("Received a not supported QoS level."); @@ -186,7 +186,7 @@ namespace MQTTnet.Core.Server _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); } - return Adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout); + return Adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }); } } } diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs index b8ff4e6..483398d 100644 --- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs @@ -40,21 +40,21 @@ namespace MQTTnet.Core.Server var connectReturnCode = ValidateConnection(connectPacket); if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { - await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket + await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode - }, _options.DefaultCommunicationTimeout).ConfigureAwait(false); + }).ConfigureAwait(false); return; } var clientSession = GetOrCreateClientSession(connectPacket); - await eventArgs.ClientAdapter.SendPacketAsync(new MqttConnAckPacket + await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new MqttConnAckPacket { ConnectReturnCode = connectReturnCode, IsSessionPresent = clientSession.IsExistingSession - }, _options.DefaultCommunicationTimeout).ConfigureAwait(false); + }).ConfigureAwait(false); await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter).ConfigureAwait(false); } diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index bf0d6f5..14da310 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -391,7 +391,9 @@ namespace MQTTnet.Core.Tests { private readonly MemoryStream _stream = new MemoryStream(); - public Stream Stream => _stream; + public Stream ReceiveStream => _stream; + + public Stream SendStream => _stream; public bool IsConnected { get; } = true; diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs index fe6e448..50eb8a7 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; @@ -26,11 +27,15 @@ namespace MQTTnet.Core.Tests return Task.FromResult(0); } - public Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) + public Task SendPacketsAsync(TimeSpan timeout, IEnumerable packets) { ThrowIfPartnerIsNull(); - Partner.SendPacketInternal(packet); + foreach (var packet in packets) + { + Partner.SendPacketInternal(packet); + } + return Task.FromResult(0); } diff --git a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs index 6175f32..fde64d9 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs @@ -78,21 +78,37 @@ namespace MQTTnet.TestApp.NetFramework Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###"); var last = DateTime.Now; - var msgs = 0; + var msgCount = 0; while (true) { - var sendTasks = Enumerable.Range( 0, msgChunkSize ) - .Select( i => PublishSingleMessage( client, ref msgs ) ) + var msgs = Enumerable.Range( 0, msgChunkSize ) + .Select( i => CreateMessage() ) .ToList(); - await Task.WhenAll( sendTasks ); + if (false) + { + //send concurrent (test for raceconditions) + var sendTasks = msgs + .Select( msg => PublishSingleMessage( client, msg, ref msgCount ) ) + .ToList(); + + await Task.WhenAll( sendTasks ); + } + else + { + await client.PublishAsync( msgs ); + msgCount += msgs.Count; + //send multiple + } + + var now = DateTime.Now; if (last < now - TimeSpan.FromSeconds(1)) { - Console.WriteLine( $"sending {msgs} inteded {msgChunkSize / interval.TotalSeconds}" ); - msgs = 0; + Console.WriteLine( $"sending {msgCount} inteded {msgChunkSize / interval.TotalSeconds}" ); + msgCount = 0; last = now; } @@ -105,19 +121,21 @@ namespace MQTTnet.TestApp.NetFramework } } - private static Task PublishSingleMessage( IMqttClient client, ref int count ) + private static MqttApplicationMessage CreateMessage() + { + return new MqttApplicationMessage( + "A/B/C", + Encoding.UTF8.GetBytes( "Hello World" ), + MqttQualityOfServiceLevel.AtMostOnce, + false + ); + } + + private static Task PublishSingleMessage( IMqttClient client, MqttApplicationMessage applicationMessage, ref int count ) { Interlocked.Increment( ref count ); return Task.Run( () => { - var applicationMessage = new MqttApplicationMessage( - "A/B/C", - Encoding.UTF8.GetBytes( "Hello World" ), - MqttQualityOfServiceLevel.AtLeastOnce, - false - ); - - //do not await to send as much messages as possible return client.PublishAsync( applicationMessage ); } ); } From 97338b016de73b49eb7b8190b01fdf161f7f2805 Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Tue, 12 Sep 2017 10:55:40 +0200 Subject: [PATCH 09/14] optimized communication adapter --- .../MqttChannelCommunicationAdapter.cs | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index b8a15fa..da0d442 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -16,6 +16,7 @@ namespace MQTTnet.Core.Adapter public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter { private readonly IMqttCommunicationChannel _channel; + private readonly byte[] _readBuffer = new byte[BufferConstants.Size]; public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer) { @@ -37,12 +38,16 @@ namespace MQTTnet.Core.Adapter public async Task SendPacketsAsync( TimeSpan timeout, IEnumerable packets ) { - foreach (var packet in packets ) + lock ( _channel ) { - MqttTrace.Information( nameof( MqttChannelCommunicationAdapter ), "TX >>> {0} [Timeout={1}]", packet, timeout ); + foreach (var packet in packets ) + { + MqttTrace.Information( nameof( MqttChannelCommunicationAdapter ), "TX >>> {0} [Timeout={1}]", packet, timeout ); - var writeBuffer = PacketSerializer.Serialize(packet); - _sendTask = SendAsync( writeBuffer ); + var writeBuffer = PacketSerializer.Serialize(packet); + + _sendTask = _sendTask.ContinueWith( p => _channel.SendStream.WriteAsync( writeBuffer, 0, writeBuffer.Length ) ); + } } await _sendTask.ConfigureAwait( false ); @@ -51,12 +56,6 @@ namespace MQTTnet.Core.Adapter private Task _sendTask = Task.FromResult(0); // this task is used to prevent overlapping write - private async Task SendAsync(byte[] buffer) - { - await _sendTask.ConfigureAwait(false); - await _channel.SendStream.WriteAsync(buffer, 0, buffer.Length).ConfigureAwait( false ); - } - public async Task ReceivePacketAsync(TimeSpan timeout) { Tuple tuple; @@ -82,20 +81,20 @@ namespace MQTTnet.Core.Adapter private async Task> ReceiveAsync() { - var header = MqttPacketReader.ReadHeaderFromSource(_channel.ReceiveStream); + var stream = _channel.ReceiveStream; + var header = MqttPacketReader.ReadHeaderFromSource(stream); MemoryStream body = null; if (header.BodyLength > 0) { var totalRead = 0; - var readBuffer = new byte[header.BodyLength]; do { - var read = await _channel.ReceiveStream.ReadAsync(readBuffer, totalRead, header.BodyLength - totalRead) + var read = await stream.ReadAsync(_readBuffer, totalRead, header.BodyLength - totalRead) .ConfigureAwait( false ); totalRead += read; } while (totalRead < header.BodyLength); - body = new MemoryStream(readBuffer, 0, header.BodyLength); + body = new MemoryStream(_readBuffer, 0, header.BodyLength); } else { From 6c7e70f78ae0e6032e514ba1d5cf1fb9fe4351e6 Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Tue, 12 Sep 2017 10:58:41 +0200 Subject: [PATCH 10/14] use blockingcollection for pipelining --- MQTTnet.Core/Internal/AsyncAutoResetEvent.cs | 32 ------------ MQTTnet.Core/Server/MqttClientMessageQueue.cs | 50 +++++-------------- 2 files changed, 12 insertions(+), 70 deletions(-) delete mode 100644 MQTTnet.Core/Internal/AsyncAutoResetEvent.cs diff --git a/MQTTnet.Core/Internal/AsyncAutoResetEvent.cs b/MQTTnet.Core/Internal/AsyncAutoResetEvent.cs deleted file mode 100644 index 795748d..0000000 --- a/MQTTnet.Core/Internal/AsyncAutoResetEvent.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System.Collections.Generic; -using System.Threading.Tasks; - -namespace MQTTnet.Core.Internal -{ - public sealed class AsyncGate - { - private readonly Queue> _waitingTasks = new Queue>(); - - public Task WaitOneAsync() - { - var tcs = new TaskCompletionSource(); - lock (_waitingTasks) - { - _waitingTasks.Enqueue(tcs); - } - - return tcs.Task; - } - - public void Set() - { - lock (_waitingTasks) - { - if (_waitingTasks.Count > 0) - { - _waitingTasks.Dequeue().SetResult(true); - } - } - } - } -} diff --git a/MQTTnet.Core/Server/MqttClientMessageQueue.cs b/MQTTnet.Core/Server/MqttClientMessageQueue.cs index ccf779d..396366e 100644 --- a/MQTTnet.Core/Server/MqttClientMessageQueue.cs +++ b/MQTTnet.Core/Server/MqttClientMessageQueue.cs @@ -1,20 +1,17 @@ using System; -using System.Collections.Generic; -using System.Linq; +using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; -using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; namespace MQTTnet.Core.Server { public sealed class MqttClientMessageQueue { - private readonly List _pendingPublishPackets = new List(); - private readonly AsyncGate _gate = new AsyncGate(); + private readonly BlockingCollection _pendingPublishPackets = new BlockingCollection(); private readonly MqttServerOptions _options; private CancellationTokenSource _cancellationTokenSource; @@ -43,55 +40,38 @@ namespace MQTTnet.Core.Server _adapter = null; _cancellationTokenSource?.Cancel(); _cancellationTokenSource = null; + _pendingPublishPackets?.Dispose(); } public void Enqueue(MqttPublishPacket publishPacket) { if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); - lock (_pendingPublishPackets) - { - _pendingPublishPackets.Add(new MqttClientPublishPacketContext(publishPacket)); - _gate.Set(); - } + _pendingPublishPackets.Add( new MqttClientPublishPacketContext( publishPacket ) ); } private async Task SendPendingPublishPacketsAsync(CancellationToken cancellationToken) { - while (!cancellationToken.IsCancellationRequested) + foreach (var publishPacket in _pendingPublishPackets.GetConsumingEnumerable(cancellationToken)) { try { - await _gate.WaitOneAsync().ConfigureAwait(false); - if (cancellationToken.IsCancellationRequested) + if ( cancellationToken.IsCancellationRequested ) { return; } - if (_adapter == null) + if ( _adapter == null ) { continue; } - List pendingPublishPackets; - lock (_pendingPublishPackets) - { - pendingPublishPackets = _pendingPublishPackets.ToList(); - } - - foreach (var publishPacket in pendingPublishPackets) - { - await TrySendPendingPublishPacketAsync(publishPacket).ConfigureAwait(false); - } + await TrySendPendingPublishPacketAsync( publishPacket ).ConfigureAwait( false ); } - catch (Exception e) + catch ( Exception e ) { - MqttTrace.Error(nameof(MqttClientMessageQueue), e, "Error while sending pending publish packets."); + MqttTrace.Error( nameof( MqttClientMessageQueue ), e, "Error while sending pending publish packets." ); } - finally - { - Cleanup(); - } } } @@ -112,23 +92,17 @@ namespace MQTTnet.Core.Server catch (MqttCommunicationException exception) { MqttTrace.Warning(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed."); + _pendingPublishPackets.Add( publishPacketContext ); } catch (Exception exception) { MqttTrace.Error(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed."); + _pendingPublishPackets.Add( publishPacketContext ); } finally { publishPacketContext.SendTries++; } } - - private void Cleanup() - { - lock (_pendingPublishPackets) - { - _pendingPublishPackets.RemoveAll(p => p.IsSent); - } - } } } From c7c1691d755f3bd0cba4b63edf89c406c277384d Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Tue, 12 Sep 2017 12:59:12 +0200 Subject: [PATCH 11/14] use raw buffer for frist connect msg then use buffered stream to read more efficient --- .../Implementations/MqttTcpChannel.cs | 11 +++++++---- .../Implementations/MqttWebSocketChannel.cs | 11 ++++++----- .../Implementations/MqttTcpChannel.cs | 1 + .../Implementations/MqttWebSocketChannel.cs | 11 ++++++----- .../Adapter/MqttChannelCommunicationAdapter.cs | 9 ++++----- MQTTnet.Core/Channel/IMqttCommunicationChannel.cs | 2 ++ Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs | 2 ++ .../TestMqttCommunicationAdapter.cs | 6 ++++++ Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs | 2 +- 9 files changed, 35 insertions(+), 20 deletions(-) diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index a1ee8c5..9c3679a 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -13,13 +13,15 @@ namespace MQTTnet.Implementations { public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { - private Stream _receiveStream; + private Stream _rawStream; private Stream _sendStream; + private Stream _receiveStream; private Socket _socket; private SslStream _sslStream; - public Stream ReceiveStream => _receiveStream; + public Stream RawStream => _rawStream; public Stream SendStream => _sendStream; + public Stream ReceiveStream => _receiveStream; /// /// called on client sockets are created in connect @@ -92,8 +94,9 @@ namespace MQTTnet.Implementations private void CreateCommStreams( Socket socket, SslStream sslStream ) { //cannot use this as default buffering prevents from receiving the first connect message - _receiveStream = (Stream)sslStream ?? new NetworkStream( socket ); - _sendStream = new BufferedStream( _receiveStream, BufferConstants.Size ); + _rawStream = (Stream)sslStream ?? new NetworkStream( socket ); + _sendStream = new BufferedStream( _rawStream, BufferConstants.Size ); + _receiveStream = new BufferedStream( _rawStream, BufferConstants.Size ); } private static X509CertificateCollection LoadCertificates(MqttClientOptions options) diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs index 02cc32e..bc224cb 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs @@ -12,10 +12,11 @@ namespace MQTTnet.Implementations public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { private ClientWebSocket _webSocket = new ClientWebSocket(); - - public Stream ReceiveStream { get; private set; } - public Stream SendStream { get; private set; } + public Stream RawStream { get; private set; } + + public Stream SendStream => RawStream; + public Stream ReceiveStream => RawStream; public async Task ConnectAsync(MqttClientOptions options) { @@ -26,7 +27,7 @@ namespace MQTTnet.Implementations _webSocket = new ClientWebSocket(); await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); - ReceiveStream = SendStream = new WebSocketStream(_webSocket); + RawStream = new WebSocketStream(_webSocket); } catch (WebSocketException exception) { @@ -36,7 +37,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync() { - ReceiveStream = null; + RawStream = null; return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index b8916e0..536ebf6 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -19,6 +19,7 @@ namespace MQTTnet.Implementations public Stream ReceiveStream => _dataStream; + public Stream RawStream => _dataStream; public Stream SendStream => _dataStream; /// diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index abce128..e452cda 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -12,9 +12,10 @@ namespace MQTTnet.Implementations public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { private ClientWebSocket _webSocket = new ClientWebSocket(); - - public Stream SendStream { get; private set; } - public Stream ReceiveStream { get; private set; } + + public Stream SendStream => RawStream; + public Stream ReceiveStream => RawStream; + public Stream RawStream { get; private set; } public async Task ConnectAsync(MqttClientOptions options) { @@ -25,7 +26,7 @@ namespace MQTTnet.Implementations _webSocket = new ClientWebSocket(); await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); - SendStream = ReceiveStream = new WebSocketStream(_webSocket); + RawStream = new WebSocketStream(_webSocket); } catch (WebSocketException exception) { @@ -35,7 +36,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync() { - SendStream = ReceiveStream = null; + RawStream = null; return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index da0d442..09875e2 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -50,7 +50,7 @@ namespace MQTTnet.Core.Adapter } } - await _sendTask.ConfigureAwait( false ); + await _sendTask; // configure await false geneates stackoverflow await _channel.SendStream.FlushAsync().TimeoutAfter( timeout ).ConfigureAwait( false ); } @@ -61,11 +61,11 @@ namespace MQTTnet.Core.Adapter Tuple tuple; if (timeout > TimeSpan.Zero) { - tuple = await ReceiveAsync().TimeoutAfter(timeout).ConfigureAwait(false); + tuple = await ReceiveAsync(_channel.RawStream).TimeoutAfter(timeout).ConfigureAwait(false); } else { - tuple = await ReceiveAsync().ConfigureAwait(false); + tuple = await ReceiveAsync(_channel.RawStream).ConfigureAwait(false); } var packet = PacketSerializer.Deserialize(tuple.Item1, tuple.Item2); @@ -79,9 +79,8 @@ namespace MQTTnet.Core.Adapter return packet; } - private async Task> ReceiveAsync() + private async Task> ReceiveAsync(Stream stream) { - var stream = _channel.ReceiveStream; var header = MqttPacketReader.ReadHeaderFromSource(stream); MemoryStream body = null; diff --git a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs index 6b0854d..80c1308 100644 --- a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs +++ b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs @@ -13,5 +13,7 @@ namespace MQTTnet.Core.Channel Stream SendStream { get; } Stream ReceiveStream { get; } + + Stream RawStream { get; } } } diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index 14da310..e0ae8fa 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -393,6 +393,8 @@ namespace MQTTnet.Core.Tests public Stream ReceiveStream => _stream; + public Stream RawStream => _stream; + public Stream SendStream => _stream; public bool IsConnected { get; } = true; diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs index 50eb8a7..e3e9028 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; @@ -46,6 +47,11 @@ namespace MQTTnet.Core.Tests return Task.Run(() => _incomingPackets.Take()); } + public IEnumerable ReceivePackets( CancellationToken cancellationToken ) + { + return _incomingPackets.GetConsumingEnumerable(); + } + private void SendPacketInternal(MqttBasePacket packet) { if (packet == null) throw new ArgumentNullException(nameof(packet)); diff --git a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs index fde64d9..049e9c9 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs @@ -17,7 +17,7 @@ namespace MQTTnet.TestApp.NetFramework public static async Task RunAsync() { var server = Task.Run(() => RunServerAsync()); - var client = Task.Run(() => RunClientAsync(500, TimeSpan.FromMilliseconds(10))); + var client = Task.Run(() => RunClientAsync(300, TimeSpan.FromMilliseconds(10))); await Task.WhenAll(server, client).ConfigureAwait(false); } From 285c70f24f482702232f94c2dcde4c6ae68810cc Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Wed, 13 Sep 2017 09:19:41 +0200 Subject: [PATCH 12/14] removed unused code --- .../Implementations/MqttTcpChannel.cs | 9 +++------ .../Implementations/MqttTcpChannel.cs | 5 ----- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index 9c3679a..677f759 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -93,8 +93,10 @@ namespace MQTTnet.Implementations private void CreateCommStreams( Socket socket, SslStream sslStream ) { - //cannot use this as default buffering prevents from receiving the first connect message _rawStream = (Stream)sslStream ?? new NetworkStream( socket ); + + //cannot use this as default buffering prevents from receiving the first connect message + //need two streams otherwise read and write have to be synchronized _sendStream = new BufferedStream( _rawStream, BufferConstants.Size ); _receiveStream = new BufferedStream( _rawStream, BufferConstants.Size ); } @@ -114,10 +116,5 @@ namespace MQTTnet.Implementations return certificates; } - - public int Peek() - { - return _socket.Available; - } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index 536ebf6..cca53a1 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -107,10 +107,5 @@ namespace MQTTnet.Implementations return certificates; } - - public int Peek() - { - return _socket.Available; - } } } \ No newline at end of file From 27bb22de0954edd1bf673af6b4f04a896f7a07fb Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Wed, 13 Sep 2017 09:35:46 +0200 Subject: [PATCH 13/14] use buffered receive stream --- MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index 09875e2..a249238 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -65,7 +65,7 @@ namespace MQTTnet.Core.Adapter } else { - tuple = await ReceiveAsync(_channel.RawStream).ConfigureAwait(false); + tuple = await ReceiveAsync(_channel.ReceiveStream).ConfigureAwait(false); } var packet = PacketSerializer.Deserialize(tuple.Item1, tuple.Item2); From 455a047242538d92e1105f72ed237c54ede5d6af Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Wed, 13 Sep 2017 09:35:56 +0200 Subject: [PATCH 14/14] fixed async await return bug --- MQTTnet.Core/Client/MqttClient.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 62c6bb3..62a5476 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -288,12 +288,14 @@ namespace MQTTnet.Core.Client if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { FireApplicationMessageReceivedEvent(publishPacket); + return; } if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { FireApplicationMessageReceivedEvent(publishPacket); await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); + return; } if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) @@ -306,6 +308,7 @@ namespace MQTTnet.Core.Client FireApplicationMessageReceivedEvent(publishPacket); await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); + return; } throw new MqttCommunicationException("Received a not supported QoS level.");