From 16a184a92f5982796b704f28893ebfb28176d690 Mon Sep 17 00:00:00 2001 From: Eggers Jan Date: Mon, 11 Sep 2017 12:51:05 +0200 Subject: [PATCH] use streams --- .../Implementations/MqttServerAdapter.cs | 9 +- .../Implementations/MqttTcpChannel.cs | 41 +-- .../Implementations/MqttWebSocketChannel.cs | 65 +---- .../Implementations/WebSocketStream.cs | 80 ++++++ .../MQTTnet.NetFramework.csproj | 1 + .../MQTTnet.NetFramework/MqttClientFactory.cs | 3 +- .../Implementations/MqttTcpChannel.cs | 42 +-- .../Implementations/MqttWebSocketChannel.cs | 67 +---- .../Implementations/WebSocketStream.cs | 80 ++++++ .../Channel/BufferedCommunicationChannel.cs | 77 ------ .../Channel/IMqttCommunicationChannel.cs | 13 +- MQTTnet.Core/MQTTnet.Core.csproj | 1 + MQTTnet.Core/Serializer/MqttPacketReader.cs | 28 +- .../Serializer/MqttPacketSerializer.cs | 241 ++++++++++-------- .../MqttPacketSerializerTests.cs | 18 +- 15 files changed, 330 insertions(+), 436 deletions(-) create mode 100644 Frameworks/MQTTnet.NetFramework/Implementations/WebSocketStream.cs create mode 100644 Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs delete mode 100644 MQTTnet.Core/Channel/BufferedCommunicationChannel.cs diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs index 47dfb45..a5279e0 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs @@ -87,7 +87,9 @@ namespace MQTTnet.Implementations try { var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false); - var clientAdapter = new MqttChannelCommunicationAdapter(new BufferedCommunicationChannel(new MqttTcpChannel(clientSocket, null)), new MqttPacketSerializer()); + + var tcpChannel = new MqttTcpChannel(clientSocket, null); + var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) when (!(exception is ObjectDisposedException)) @@ -110,8 +112,9 @@ namespace MQTTnet.Implementations var sslStream = new SslStream(new NetworkStream(clientSocket)); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); - - var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); + + var tcpChannel = new MqttTcpChannel(clientSocket, sslStream); + var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } catch (Exception exception) diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index 0ecf373..dede268 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -17,6 +17,8 @@ namespace MQTTnet.Implementations private Socket _socket; private SslStream _sslStream; + public Stream Stream => _dataStream; + /// /// called on client sockets are created in connect /// @@ -79,45 +81,6 @@ namespace MQTTnet.Implementations } } - public async Task WriteAsync(byte[] buffer) - { - if (buffer == null) throw new ArgumentNullException(nameof(buffer)); - - try - { - await _dataStream.WriteAsync(buffer, 0, buffer.Length); - } - catch (SocketException exception) - { - throw new MqttCommunicationException(exception); - } - } - - public async Task> ReadAsync(int length, byte[] buffer) - { - try - { - var totalBytes = 0; - - do - { - var read = await _dataStream.ReadAsync(buffer, totalBytes, length - totalBytes).ConfigureAwait(false); - if (read == 0) - { - throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting)); - } - - totalBytes += read; - } - while (totalBytes < length); - return new ArraySegment(buffer, 0, length); - } - catch (SocketException exception) - { - throw new MqttCommunicationException(exception); - } - } - public void Dispose() { _socket?.Dispose(); diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs index 925c818..c3aed02 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs @@ -2,6 +2,7 @@ using MQTTnet.Core.Client; using MQTTnet.Core.Exceptions; using System; +using System.IO; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; @@ -11,8 +12,8 @@ namespace MQTTnet.Implementations public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { private ClientWebSocket _webSocket = new ClientWebSocket(); - private int WebSocketBufferSize; - private int WebSocketBufferOffset; + + public Stream Stream { get; private set; } public async Task ConnectAsync(MqttClientOptions options) { @@ -22,6 +23,8 @@ namespace MQTTnet.Implementations { _webSocket = new ClientWebSocket(); await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + + Stream = new WebSocketStream(_webSocket); } catch (WebSocketException exception) { @@ -31,6 +34,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync() { + Stream = null; return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } @@ -38,62 +42,5 @@ namespace MQTTnet.Implementations { _webSocket?.Dispose(); } - - public async Task> ReadAsync(int length, byte[] buffer) - { - await ReadToBufferAsync(length, buffer).ConfigureAwait(false); - - var result = new ArraySegment(buffer, WebSocketBufferOffset, length); - WebSocketBufferSize -= length; - WebSocketBufferOffset += length; - - return result; - } - - private async Task ReadToBufferAsync(int length, byte[] buffer) - { - if (WebSocketBufferSize > 0) - { - return; - } - - var offset = 0; - while (_webSocket.State == WebSocketState.Open && WebSocketBufferSize < length) - { - WebSocketReceiveResult response; - do - { - response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, offset, buffer.Length - offset), CancellationToken.None).ConfigureAwait(false); - offset += response.Count; - } while (!response.EndOfMessage); - - WebSocketBufferSize = response.Count; - if (response.MessageType == WebSocketMessageType.Close) - { - await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false); - } - } - } - - public async Task WriteAsync(byte[] buffer) - { - if (buffer == null) { - throw new ArgumentNullException(nameof(buffer)); - } - - try - { - await _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, CancellationToken.None); - } - catch (WebSocketException exception) - { - throw new MqttCommunicationException(exception); - } - } - - public int Peek() - { - return WebSocketBufferSize; - } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.NetFramework/Implementations/WebSocketStream.cs new file mode 100644 index 0000000..4a89a92 --- /dev/null +++ b/Frameworks/MQTTnet.NetFramework/Implementations/WebSocketStream.cs @@ -0,0 +1,80 @@ +using System; +using System.IO; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Implementations +{ + public class WebSocketStream : Stream + { + private readonly ClientWebSocket _webSocket; + + public WebSocketStream( ClientWebSocket webSocket ) + { + _webSocket = webSocket; + } + + public override void Flush() + { + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + var currentOffset = offset; + var targetOffset = offset + count; + while (_webSocket.State == WebSocketState.Open && currentOffset < targetOffset) + { + var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); + currentOffset += response.Count; + + if ( response.MessageType == WebSocketMessageType.Close ) + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); + } + } + + return currentOffset - offset; + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return _webSocket.SendAsync(new ArraySegment( buffer, offset, count ), WebSocketMessageType.Binary, true, cancellationToken); + } + + public override int Read( byte[] buffer, int offset, int count ) + { + return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => true; + + public override long Length + { + get { throw new NotSupportedException(); } + } + + public override long Position + { + get { throw new NotSupportedException(); } + set { throw new NotSupportedException(); } + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + } +} diff --git a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj index 7578afe..3f2519f 100644 --- a/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj +++ b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj @@ -101,6 +101,7 @@ + diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs index b3e0080..b057dde 100644 --- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs @@ -22,7 +22,8 @@ namespace MQTTnet { case MqttConnectionType.Tcp: case MqttConnectionType.Tls: - return new BufferedCommunicationChannel( new MqttTcpChannel() ); + var tcp = new MqttTcpChannel(); + return tcp; case MqttConnectionType.Ws: case MqttConnectionType.Wss: return new MqttWebSocketChannel(); diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index bddd79d..817ab44 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -17,6 +17,9 @@ namespace MQTTnet.Implementations private Socket _socket; private SslStream _sslStream; + + public Stream Stream => _dataStream; + /// /// called on client sockets are created in connect /// @@ -78,45 +81,6 @@ namespace MQTTnet.Implementations } } - public async Task WriteAsync(byte[] buffer) - { - if (buffer == null) throw new ArgumentNullException(nameof(buffer)); - - try - { - await _dataStream.WriteAsync(buffer, 0, buffer.Length); - } - catch (SocketException exception) - { - throw new MqttCommunicationException(exception); - } - } - - public async Task> ReadAsync(int length, byte[] buffer) - { - try - { - var totalBytes = 0; - - do - { - var read = await _dataStream.ReadAsync(buffer, totalBytes, length - totalBytes).ConfigureAwait(false); - if (read == 0) - { - throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting)); - } - - totalBytes += read; - } - while (totalBytes < length); - return new ArraySegment(buffer, 0, length); - } - catch (SocketException exception) - { - throw new MqttCommunicationException(exception); - } - } - public void Dispose() { _socket?.Dispose(); diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index 5a89ac7..2e0bdf8 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -2,6 +2,7 @@ using MQTTnet.Core.Client; using MQTTnet.Core.Exceptions; using System; +using System.IO; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; @@ -11,8 +12,8 @@ namespace MQTTnet.Implementations public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { private ClientWebSocket _webSocket = new ClientWebSocket(); - private int _bufferSize; - private int _bufferOffset; + + public Stream Stream { get; private set; } public async Task ConnectAsync(MqttClientOptions options) { @@ -22,6 +23,8 @@ namespace MQTTnet.Implementations { _webSocket = new ClientWebSocket(); await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + + Stream = new WebSocketStream(_webSocket); } catch (WebSocketException exception) { @@ -31,6 +34,7 @@ namespace MQTTnet.Implementations public Task DisconnectAsync() { + Stream = null; return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } @@ -38,64 +42,5 @@ namespace MQTTnet.Implementations { _webSocket?.Dispose(); } - - public async Task> ReadAsync(int length, byte[] buffer) - { - await ReadToBufferAsync(length, buffer).ConfigureAwait(false); - - var result = new ArraySegment(buffer, _bufferOffset, length); - _bufferSize -= length; - _bufferOffset += length; - - return result; - } - - private async Task ReadToBufferAsync(int length, byte[] buffer) - { - if (_bufferSize > 0) - { - return; - } - - var offset = 0; - while (_webSocket.State == WebSocketState.Open && _bufferSize < length) - { - WebSocketReceiveResult response; - do - { - response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, offset, buffer.Length - offset), CancellationToken.None).ConfigureAwait(false); - offset += response.Count; - } while (!response.EndOfMessage); - - _bufferSize = response.Count; - - if (response.MessageType == WebSocketMessageType.Close) - { - await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false); - } - } - } - - public async Task WriteAsync(byte[] buffer) - { - if (buffer == null) - { - throw new ArgumentNullException(nameof(buffer)); - } - - try - { - await _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, CancellationToken.None); - } - catch (WebSocketException exception) - { - throw new MqttCommunicationException(exception); - } - } - - public int Peek() - { - return _bufferSize; - } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs new file mode 100644 index 0000000..0b74692 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs @@ -0,0 +1,80 @@ +using System; +using System.IO; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Implementations +{ + public class WebSocketStream : Stream + { + private readonly ClientWebSocket _webSocket; + + public WebSocketStream(ClientWebSocket webSocket) + { + _webSocket = webSocket; + } + + public override void Flush() + { + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + var currentOffset = offset; + var targetOffset = offset + count; + while ( _webSocket.State == WebSocketState.Open && currentOffset < targetOffset ) + { + var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); + currentOffset += response.Count; + + if ( response.MessageType == WebSocketMessageType.Close ) + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); + } + } + + return currentOffset - offset; + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return _webSocket.SendAsync(new ArraySegment( buffer, offset, count ), WebSocketMessageType.Binary, true, cancellationToken); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync( buffer, offset, count ).GetAwaiter().GetResult(); + } + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => true; + + public override long Length + { + get { throw new NotSupportedException(); } + } + + public override long Position + { + get { throw new NotSupportedException(); } + set { throw new NotSupportedException(); } + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + } +} diff --git a/MQTTnet.Core/Channel/BufferedCommunicationChannel.cs b/MQTTnet.Core/Channel/BufferedCommunicationChannel.cs deleted file mode 100644 index 757f9c2..0000000 --- a/MQTTnet.Core/Channel/BufferedCommunicationChannel.cs +++ /dev/null @@ -1,77 +0,0 @@ -using System.Threading.Tasks; -using MQTTnet.Core.Client; -using System; - -namespace MQTTnet.Core.Channel -{ - public class BufferedCommunicationChannel : IMqttCommunicationChannel - { - private readonly IMqttCommunicationChannel _inner; - private int _bufferSize; - private int _bufferOffset; - - public BufferedCommunicationChannel(IMqttCommunicationChannel inner) - { - _inner = inner; - } - - public Task ConnectAsync(MqttClientOptions options) - { - return _inner.ConnectAsync(options); - } - - public Task DisconnectAsync() - { - return _inner.DisconnectAsync(); - } - - public int Peek() - { - return _inner.Peek(); - } - - public async Task> ReadAsync(int length, byte[] buffer) - { - //read from buffer - if (_bufferSize > 0) - { - return ReadFomBuffer(length, buffer); - } - - var available = _inner.Peek(); - // if there are less or equal bytes available then requested then just read em - if (available <= length) - { - return await _inner.ReadAsync(length, buffer); - } - - //if more bytes are available than requested do buffer them to reduce calls to network buffers - await WriteToBuffer(available, buffer).ConfigureAwait(false); - return ReadFomBuffer(length, buffer); - } - - private async Task WriteToBuffer(int available, byte[] buffer) - { - await _inner.ReadAsync(available, buffer).ConfigureAwait(false); - _bufferSize = available; - _bufferOffset = 0; - } - - private ArraySegment ReadFomBuffer(int length, byte[] buffer) - { - var result = new ArraySegment(buffer, _bufferOffset, length); - _bufferSize -= length; - _bufferOffset += length; - - if (_bufferSize < 0) - { - } - return result; - } - - public Task WriteAsync(byte[] buffer) - { - return _inner.WriteAsync(buffer); - } - } -} diff --git a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs index 0f6ea4b..6fb72f3 100644 --- a/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs +++ b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs @@ -1,6 +1,6 @@ using System.Threading.Tasks; using MQTTnet.Core.Client; -using System; +using System.IO; namespace MQTTnet.Core.Channel { @@ -9,14 +9,7 @@ namespace MQTTnet.Core.Channel Task ConnectAsync(MqttClientOptions options); Task DisconnectAsync(); - - Task WriteAsync(byte[] buffer); - - /// - /// get the currently available number of bytes without reading them - /// - int Peek(); - - Task> ReadAsync(int length, byte[] buffer); + + Stream Stream { get; } } } diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index 480b4c7..797bb85 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -5,6 +5,7 @@ MQTTnet.Core MQTTnet.Core False + Full diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index 38d3c7c..ea479c2 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -49,13 +49,13 @@ namespace MQTTnet.Core.Serializer return ReadBytes(_header.BodyLength - (int)BaseStream.Position); } - public static async Task ReadHeaderFromSourceAsync(IMqttCommunicationChannel source, byte[] buffer) + public static MqttPacketHeader ReadHeaderFromSource(IMqttCommunicationChannel source) { - var fixedHeader = await ReadStreamByteAsync(source, buffer).ConfigureAwait(false); + var fixedHeader = (byte)source.Stream.ReadByte(); var byteReader = new ByteReader(fixedHeader); byteReader.Read(4); var controlPacketType = (MqttControlPacketType)byteReader.Read(4); - var bodyLength = await ReadBodyLengthFromSourceAsync(source, buffer).ConfigureAwait(false); + var bodyLength = ReadBodyLengthFromSource(source); return new MqttPacketHeader() { @@ -64,26 +64,8 @@ namespace MQTTnet.Core.Serializer BodyLength = bodyLength }; } - - private static async Task ReadStreamByteAsync(IMqttCommunicationChannel source, byte[] readBuffer) - { - var result = await ReadFromSourceAsync(source, 1, readBuffer).ConfigureAwait(false); - return result.Array[result.Offset]; - } - - public static async Task> ReadFromSourceAsync(IMqttCommunicationChannel source, int length, byte[] buffer) - { - try - { - return await source.ReadAsync(length, buffer); - } - catch (Exception exception) - { - throw new MqttCommunicationException(exception); - } - } - private static async Task ReadBodyLengthFromSourceAsync(IMqttCommunicationChannel source, byte[] buffer) + private static int ReadBodyLengthFromSource(IMqttCommunicationChannel source) { // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. var multiplier = 1; @@ -91,7 +73,7 @@ namespace MQTTnet.Core.Serializer byte encodedByte; do { - encodedByte = await ReadStreamByteAsync(source, buffer).ConfigureAwait(false); + encodedByte = (byte)source.Stream.ReadByte(); value += (encodedByte & 127) * multiplier; multiplier *= 128; if (multiplier > 128 * 128 * 128) diff --git a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs index 064b79b..6f2c9ff 100644 --- a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs @@ -17,7 +17,7 @@ namespace MQTTnet.Core.Serializer private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs"); public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; - private byte[] _readBuffer = new byte[BufferConstants.Size]; + private Task _sendTask = Task.FromResult( 0 ); public async Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination) { @@ -32,12 +32,21 @@ namespace MQTTnet.Core.Serializer var body = stream.ToArray(); MqttPacketWriter.BuildLengthHeader(body.Length, header); - - await destination.WriteAsync(header.ToArray()).ConfigureAwait(false); - await destination.WriteAsync(body).ConfigureAwait(false); + var headerArray = header.ToArray(); + var writeBuffer = new byte[header.Count + body.Length]; + Buffer.BlockCopy( headerArray, 0, writeBuffer, 0, headerArray.Length ); + Buffer.BlockCopy( body, 0, writeBuffer, headerArray.Length, body.Length ); + + _sendTask = Send( writeBuffer, destination ); } } + private async Task Send(byte[] buffer, IMqttCommunicationChannel destination ) + { + await _sendTask.ConfigureAwait( false ); + await destination.Stream.WriteAsync( buffer, 0, buffer.Length ).ConfigureAwait( false ); + } + private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter writer) { if (packet is MqttConnectPacket connectPacket) @@ -116,117 +125,127 @@ namespace MQTTnet.Core.Serializer public async Task DeserializeAsync(IMqttCommunicationChannel source) { if (source == null) throw new ArgumentNullException(nameof(source)); + + var header = MqttPacketReader.ReadHeaderFromSource(source); + + MemoryStream body = null; + if ( header.BodyLength > 0 ) + { + var totalRead = 0; + var readBuffer = new byte[header.BodyLength]; + do + { + var read = await source.Stream.ReadAsync( readBuffer, totalRead, header.BodyLength - totalRead ).ConfigureAwait(false); + totalRead += read; + } while ( totalRead < header.BodyLength ); + body = new MemoryStream( readBuffer, 0, header.BodyLength ); + } + else + { + body = new MemoryStream(); + } - var header = await MqttPacketReader.ReadHeaderFromSourceAsync(source, _readBuffer).ConfigureAwait(false); - - var body = await GetBody(source, header).ConfigureAwait(false); - - using (var mqttPacketReader = new MqttPacketReader(body, header)) + using (var reader = new MqttPacketReader(body, header)) { - switch (header.ControlPacketType) - { - case MqttControlPacketType.Connect: - { - return DeserializeConnect(mqttPacketReader); - } - - case MqttControlPacketType.ConnAck: - { - return DeserializeConnAck(mqttPacketReader); - } - - case MqttControlPacketType.Disconnect: - { - return new MqttDisconnectPacket(); - } - - case MqttControlPacketType.Publish: - { - return DeserializePublish(mqttPacketReader, header); - } - - case MqttControlPacketType.PubAck: - { - return new MqttPubAckPacket - { - PacketIdentifier = mqttPacketReader.ReadUInt16() - }; - } - - case MqttControlPacketType.PubRec: - { - return new MqttPubRecPacket - { - PacketIdentifier = mqttPacketReader.ReadUInt16() - }; - } - - case MqttControlPacketType.PubRel: - { - return new MqttPubRelPacket - { - PacketIdentifier = mqttPacketReader.ReadUInt16() - }; - } - - case MqttControlPacketType.PubComp: - { - return new MqttPubCompPacket - { - PacketIdentifier = mqttPacketReader.ReadUInt16() - }; - } - - case MqttControlPacketType.PingReq: - { - return new MqttPingReqPacket(); - } - - case MqttControlPacketType.PingResp: - { - return new MqttPingRespPacket(); - } - - case MqttControlPacketType.Subscribe: - { - return DeserializeSubscribe(mqttPacketReader); - } - - case MqttControlPacketType.SubAck: - { - return DeserializeSubAck(mqttPacketReader); - } - - case MqttControlPacketType.Unsubscibe: - { - return DeserializeUnsubscribe(mqttPacketReader); - } - - case MqttControlPacketType.UnsubAck: - { - return new MqttUnsubAckPacket - { - PacketIdentifier = mqttPacketReader.ReadUInt16() - }; - } - - default: - { - throw new MqttProtocolViolationException($"Packet type ({(int)header.ControlPacketType}) not supported."); - } - } + return Deserialize( header, reader ); } } - private async Task GetBody(IMqttCommunicationChannel source, MqttPacketHeader header) + private static MqttBasePacket Deserialize(MqttPacketHeader header, MqttPacketReader reader) { - if (header.BodyLength > 0) + switch (header.ControlPacketType) { - var segment = await MqttPacketReader.ReadFromSourceAsync(source, header.BodyLength, _readBuffer).ConfigureAwait(false); - return new MemoryStream(segment.Array, segment.Offset, segment.Count); - } + case MqttControlPacketType.Connect: + { + return DeserializeConnect( reader ); + } + + case MqttControlPacketType.ConnAck: + { + return DeserializeConnAck( reader ); + } + + case MqttControlPacketType.Disconnect: + { + return new MqttDisconnectPacket(); + } + + case MqttControlPacketType.Publish: + { + return DeserializePublish( reader, header ); + } + + case MqttControlPacketType.PubAck: + { + return new MqttPubAckPacket + { + PacketIdentifier = reader.ReadUInt16() + }; + } + + case MqttControlPacketType.PubRec: + { + return new MqttPubRecPacket + { + PacketIdentifier = reader.ReadUInt16() + }; + } - return new MemoryStream(); + case MqttControlPacketType.PubRel: + { + return new MqttPubRelPacket + { + PacketIdentifier = reader.ReadUInt16() + }; + } + + case MqttControlPacketType.PubComp: + { + return new MqttPubCompPacket + { + PacketIdentifier = reader.ReadUInt16() + }; + } + + case MqttControlPacketType.PingReq: + { + return new MqttPingReqPacket(); + } + + case MqttControlPacketType.PingResp: + { + return new MqttPingRespPacket(); + } + + case MqttControlPacketType.Subscribe: + { + return DeserializeSubscribe( reader ); + } + + case MqttControlPacketType.SubAck: + { + return DeserializeSubAck( reader ); + } + + case MqttControlPacketType.Unsubscibe: + { + return DeserializeUnsubscribe( reader ); + } + + case MqttControlPacketType.UnsubAck: + { + return new MqttUnsubAckPacket + { + PacketIdentifier = reader.ReadUInt16() + }; + } + + default: + { + throw new MqttProtocolViolationException( + $"Packet type ({(int) header.ControlPacketType}) not supported." ); + } + } } private static MqttBasePacket DeserializeUnsubscribe(MqttPacketReader reader) @@ -268,7 +287,13 @@ namespace MQTTnet.Core.Serializer var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2); var dup = fixedHeader.Read(); - var topic = reader.ReadStringWithLengthPrefix(); + + var length = reader.ReadUInt16(); + if (length != 5) + { + + } + var topic = Encoding.UTF8.GetString( reader.ReadBytes( length ), 0, length ); ushort packetIdentifier = 0; if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) diff --git a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs index 6a18f50..4f33be3 100644 --- a/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs @@ -391,6 +391,8 @@ namespace MQTTnet.Core.Tests { private readonly MemoryStream _stream = new MemoryStream(); + public Stream Stream => _stream; + public bool IsConnected { get; } = true; public TestChannel() @@ -413,26 +415,10 @@ namespace MQTTnet.Core.Tests return Task.FromResult(0); } - public Task WriteAsync(byte[] buffer) - { - return _stream.WriteAsync(buffer, 0, buffer.Length); - } - - public async Task> ReadAsync(int length, byte[] buffer) - { - await _stream.ReadAsync(buffer, 0, length); - return new ArraySegment(buffer, 0, length); - } - public byte[] ToArray() { return _stream.ToArray(); } - - public int Peek() - { - return (int)_stream.Length - (int)_stream.Position; - } } private static void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)