Browse Source

Merge pull request #36 from JanEggers/Buffering

added perf test app
release/3.x.x
Christian 7 years ago
committed by GitHub
parent
commit
16949ffb21
17 changed files with 643 additions and 447 deletions
  1. +2
    -1
      Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs
  2. +9
    -5
      Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs
  3. +28
    -37
      Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs
  4. +1
    -1
      Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
  5. +9
    -5
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  6. +28
    -37
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  7. +7
    -0
      MQTTnet.Core/Channel/BufferConstants.cs
  8. +77
    -0
      MQTTnet.Core/Channel/BufferedCommunicationChannel.cs
  9. +7
    -1
      MQTTnet.Core/Channel/IMqttCommunicationChannel.cs
  10. +13
    -0
      MQTTnet.Core/Packets/MqttPacketHeader.cs
  11. +52
    -86
      MQTTnet.Core/Serializer/MqttPacketReader.cs
  12. +207
    -223
      MQTTnet.Core/Serializer/MqttPacketSerializer.cs
  13. +23
    -49
      MQTTnet.Core/Serializer/MqttPacketWriter.cs
  14. +8
    -2
      Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs
  15. +1
    -0
      Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj
  16. +165
    -0
      Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs
  17. +6
    -0
      Tests/MQTTnet.TestApp.NetFramework/Program.cs

+ 2
- 1
Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs View File

@@ -10,6 +10,7 @@ using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Serializer; using MQTTnet.Core.Serializer;
using MQTTnet.Core.Server; using MQTTnet.Core.Server;
using MQTTnet.Core.Channel;


namespace MQTTnet.Implementations namespace MQTTnet.Implementations
{ {
@@ -86,7 +87,7 @@ namespace MQTTnet.Implementations
try try
{ {
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false); var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new BufferedCommunicationChannel(new MqttTcpChannel(clientSocket, null)), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
} }
catch (Exception exception) when (!(exception is ObjectDisposedException)) catch (Exception exception) when (!(exception is ObjectDisposedException))


+ 9
- 5
Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs View File

@@ -93,17 +93,15 @@ namespace MQTTnet.Implementations
} }
} }


public async Task ReadAsync(byte[] buffer)
public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{ {
if (buffer == null) throw new ArgumentNullException(nameof(buffer));

try try
{ {
var totalBytes = 0; var totalBytes = 0;


do do
{ {
var read = await _dataStream.ReadAsync(buffer, totalBytes, buffer.Length - totalBytes).ConfigureAwait(false);
var read = await _dataStream.ReadAsync(buffer, totalBytes, length - totalBytes).ConfigureAwait(false);
if (read == 0) if (read == 0)
{ {
throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting)); throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting));
@@ -111,7 +109,8 @@ namespace MQTTnet.Implementations


totalBytes += read; totalBytes += read;
} }
while (totalBytes < buffer.Length);
while (totalBytes < length);
return new ArraySegment<byte>(buffer, 0, length);
} }
catch (SocketException exception) catch (SocketException exception)
{ {
@@ -143,5 +142,10 @@ namespace MQTTnet.Implementations


return certificates; return certificates;
} }

public int Peek()
{
return _socket.Available;
}
} }
} }

+ 28
- 37
Frameworks/MQTTnet.NetFramework/Implementations/MqttWebSocketChannel.cs View File

@@ -11,9 +11,6 @@ namespace MQTTnet.Implementations
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{ {
private ClientWebSocket _webSocket = new ClientWebSocket(); private ClientWebSocket _webSocket = new ClientWebSocket();
private const int BufferSize = 4096;
private const int BufferAmplifier = 20;
private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier];
private int WebSocketBufferSize; private int WebSocketBufferSize;
private int WebSocketBufferOffset; private int WebSocketBufferOffset;


@@ -42,50 +39,39 @@ namespace MQTTnet.Implementations
_webSocket?.Dispose(); _webSocket?.Dispose();
} }


public Task ReadAsync(byte[] buffer)
public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{ {
return Task.WhenAll(ReadToBufferAsync(buffer));
await ReadToBufferAsync(length, buffer).ConfigureAwait(false);
var result = new ArraySegment<byte>(buffer, WebSocketBufferOffset, length);
WebSocketBufferSize -= length;
WebSocketBufferOffset += length;
return result;
} }


private async Task ReadToBufferAsync(byte[] buffer)
private async Task ReadToBufferAsync(int length, byte[] buffer)
{ {
var temporaryBuffer = new byte[BufferSize];
var offset = 0;
if (WebSocketBufferSize > 0)
{
return;
}


while (_webSocket.State == WebSocketState.Open)
var offset = 0;
while (_webSocket.State == WebSocketState.Open && WebSocketBufferSize < length)
{ {
if (WebSocketBufferSize == 0)
WebSocketReceiveResult response;
do
{ {
WebSocketBufferOffset = 0;

WebSocketReceiveResult response;
do
{
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None).ConfigureAwait(false);

temporaryBuffer.CopyTo(WebSocketBuffer, offset);
offset += response.Count;
temporaryBuffer = new byte[BufferSize];
} while (!response.EndOfMessage);
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, offset, buffer.Length - offset), CancellationToken.None).ConfigureAwait(false);
offset += response.Count;
} while (!response.EndOfMessage);


WebSocketBufferSize = response.Count;
if (response.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
}

Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length);
WebSocketBufferSize -= buffer.Length;
WebSocketBufferOffset += buffer.Length;
}
else
WebSocketBufferSize = response.Count;
if (response.MessageType == WebSocketMessageType.Close)
{ {
Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length);
WebSocketBufferSize -= buffer.Length;
WebSocketBufferOffset += buffer.Length;
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
} }

return;
} }
} }


@@ -105,5 +91,10 @@ namespace MQTTnet.Implementations
throw new MqttCommunicationException(exception); throw new MqttCommunicationException(exception);
} }
} }

public int Peek()
{
return WebSocketBufferSize;
}
} }
} }

+ 1
- 1
Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs View File

@@ -22,7 +22,7 @@ namespace MQTTnet
{ {
case MqttConnectionType.Tcp: case MqttConnectionType.Tcp:
case MqttConnectionType.Tls: case MqttConnectionType.Tls:
return new MqttTcpChannel();
return new BufferedCommunicationChannel( new MqttTcpChannel() );
case MqttConnectionType.Ws: case MqttConnectionType.Ws:
case MqttConnectionType.Wss: case MqttConnectionType.Wss:
return new MqttWebSocketChannel(); return new MqttWebSocketChannel();


+ 9
- 5
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs View File

@@ -92,17 +92,15 @@ namespace MQTTnet.Implementations
} }
} }


public async Task ReadAsync(byte[] buffer)
public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{ {
if (buffer == null) throw new ArgumentNullException(nameof(buffer));

try try
{ {
var totalBytes = 0; var totalBytes = 0;


do do
{ {
var read = await _dataStream.ReadAsync(buffer, totalBytes, buffer.Length - totalBytes).ConfigureAwait(false);
var read = await _dataStream.ReadAsync(buffer, totalBytes, length - totalBytes).ConfigureAwait(false);
if (read == 0) if (read == 0)
{ {
throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting)); throw new MqttCommunicationException(new SocketException((int)SocketError.Disconnecting));
@@ -110,7 +108,8 @@ namespace MQTTnet.Implementations


totalBytes += read; totalBytes += read;
} }
while (totalBytes < buffer.Length);
while (totalBytes < length);
return new ArraySegment<byte>(buffer, 0, length);
} }
catch (SocketException exception) catch (SocketException exception)
{ {
@@ -142,5 +141,10 @@ namespace MQTTnet.Implementations


return certificates; return certificates;
} }

public int Peek()
{
return _socket.Available;
}
} }
} }

+ 28
- 37
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs View File

@@ -11,9 +11,6 @@ namespace MQTTnet.Implementations
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{ {
private ClientWebSocket _webSocket = new ClientWebSocket(); private ClientWebSocket _webSocket = new ClientWebSocket();
private const int BufferSize = 4096;
private const int BufferAmplifier = 20;
private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier];
private int WebSocketBufferSize; private int WebSocketBufferSize;
private int WebSocketBufferOffset; private int WebSocketBufferOffset;


@@ -42,50 +39,39 @@ namespace MQTTnet.Implementations
_webSocket?.Dispose(); _webSocket?.Dispose();
} }


public Task ReadAsync(byte[] buffer)
public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{ {
return Task.WhenAll(ReadToBufferAsync(buffer));
await ReadToBufferAsync(length, buffer).ConfigureAwait(false);

var result = new ArraySegment<byte>(buffer, WebSocketBufferOffset, length);
WebSocketBufferSize -= length;
WebSocketBufferOffset += length;

return result;
} }


private async Task ReadToBufferAsync(byte[] buffer)
private async Task ReadToBufferAsync(int length, byte[] buffer)
{ {
var temporaryBuffer = new byte[BufferSize];
var offset = 0;
if (WebSocketBufferSize > 0)
{
return;
}


while (_webSocket.State == WebSocketState.Open)
var offset = 0;
while (_webSocket.State == WebSocketState.Open && WebSocketBufferSize < length)
{ {
if (WebSocketBufferSize == 0)
WebSocketReceiveResult response;
do
{ {
WebSocketBufferOffset = 0;

WebSocketReceiveResult response;
do
{
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None).ConfigureAwait(false);
response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, offset, buffer.Length - offset), CancellationToken.None).ConfigureAwait(false);
offset += response.Count;
} while (!response.EndOfMessage);


temporaryBuffer.CopyTo(WebSocketBuffer, offset);
offset += response.Count;
temporaryBuffer = new byte[BufferSize];
} while (!response.EndOfMessage);

WebSocketBufferSize = response.Count;
if (response.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
}

Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length);
WebSocketBufferSize -= buffer.Length;
WebSocketBufferOffset += buffer.Length;
}
else
WebSocketBufferSize = response.Count;
if (response.MessageType == WebSocketMessageType.Close)
{ {
Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length);
WebSocketBufferSize -= buffer.Length;
WebSocketBufferOffset += buffer.Length;
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
} }

return;
} }
} }


@@ -105,5 +91,10 @@ namespace MQTTnet.Implementations
throw new MqttCommunicationException(exception); throw new MqttCommunicationException(exception);
} }
} }

public int Peek()
{
return WebSocketBufferSize;
}
} }
} }

+ 7
- 0
MQTTnet.Core/Channel/BufferConstants.cs View File

@@ -0,0 +1,7 @@
namespace MQTTnet.Core.Channel
{
public static class BufferConstants
{
public const int Size = 4096 * 20;
}
}

+ 77
- 0
MQTTnet.Core/Channel/BufferedCommunicationChannel.cs View File

@@ -0,0 +1,77 @@
using System.Threading.Tasks;
using MQTTnet.Core.Client;
using System;

namespace MQTTnet.Core.Channel
{
public class BufferedCommunicationChannel : IMqttCommunicationChannel
{
private IMqttCommunicationChannel _inner { get; }
private int _bufferSize = 0;
private int _bufferOffset = 0;

public BufferedCommunicationChannel(IMqttCommunicationChannel inner)
{
_inner = inner;
}

public Task ConnectAsync(MqttClientOptions options)
{
return _inner.ConnectAsync(options);
}

public Task DisconnectAsync()
{
return _inner.DisconnectAsync();
}

public int Peek()
{
return _inner.Peek();
}

public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{
//read from buffer
if (_bufferSize > 0)
{
return ReadFomBuffer(length, buffer);
}

var available = _inner.Peek();
// if there are less or equal bytes available then requested then just read em
if (available <= length)
{
return await _inner.ReadAsync(length, buffer);
}

//if more bytes are available than requested do buffer them to reduce calls to network buffers
await WriteToBuffer(available, buffer).ConfigureAwait(false);
return ReadFomBuffer(length, buffer);
}

private async Task WriteToBuffer(int available, byte[] buffer)
{
await _inner.ReadAsync(available, buffer).ConfigureAwait(false);
_bufferSize = available;
_bufferOffset = 0;
}

private ArraySegment<byte> ReadFomBuffer(int length, byte[] buffer)
{
var result = new ArraySegment<byte>(buffer, _bufferOffset, length);
_bufferSize -= length;
_bufferOffset += length;

if (_bufferSize < 0)
{
}
return result;
}

public Task WriteAsync(byte[] buffer)
{
return _inner.WriteAsync(buffer);
}
}
}

+ 7
- 1
MQTTnet.Core/Channel/IMqttCommunicationChannel.cs View File

@@ -1,5 +1,6 @@
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Client; using MQTTnet.Core.Client;
using System;


namespace MQTTnet.Core.Channel namespace MQTTnet.Core.Channel
{ {
@@ -11,6 +12,11 @@ namespace MQTTnet.Core.Channel


Task WriteAsync(byte[] buffer); Task WriteAsync(byte[] buffer);


Task ReadAsync(byte[] buffer);
/// <summary>
/// get the currently available number of bytes without reading them
/// </summary>
int Peek();

Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer);
} }
} }

+ 13
- 0
MQTTnet.Core/Packets/MqttPacketHeader.cs View File

@@ -0,0 +1,13 @@
using MQTTnet.Core.Protocol;

namespace MQTTnet.Core.Packets
{
public class MqttPacketHeader
{
public MqttControlPacketType ControlPacketType { get; set; }

public byte FixedHeader { get; set; }

public int BodyLength { get; set; }
}
}

+ 52
- 86
MQTTnet.Core/Serializer/MqttPacketReader.cs View File

@@ -2,55 +2,28 @@
using System.IO; using System.IO;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Exceptions; using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Packets;


namespace MQTTnet.Core.Serializer namespace MQTTnet.Core.Serializer
{ {
public sealed class MqttPacketReader : IDisposable
public sealed class MqttPacketReader : BinaryReader
{ {
private readonly MemoryStream _remainingData = new MemoryStream(1024);
private readonly IMqttCommunicationChannel _source;
private readonly MqttPacketHeader _header;


private int _remainingLength;

public MqttPacketReader(IMqttCommunicationChannel source)
public MqttPacketReader(Stream stream, MqttPacketHeader header)
: base(stream)
{ {
_source = source ?? throw new ArgumentNullException(nameof(source));
_header = header;
} }

public MqttControlPacketType ControlPacketType { get; private set; }

public byte FixedHeader { get; private set; }

public bool EndOfRemainingData => _remainingData.Position == _remainingData.Length;

public async Task ReadToEndAsync()
public bool EndOfRemainingData => BaseStream.Position == _header.BodyLength;
public override ushort ReadUInt16()
{ {
await ReadFixedHeaderAsync().ConfigureAwait(false);
await ReadRemainingLengthAsync().ConfigureAwait(false);

if (_remainingLength == 0)
{
return;
}

var buffer = new byte[_remainingLength];
await ReadFromSourceAsync(buffer).ConfigureAwait(false);
_remainingData.Write(buffer, 0, buffer.Length);
_remainingData.Position = 0;
}

public byte ReadRemainingDataByte()
{
return ReadRemainingData(1)[0];
}

public ushort ReadRemainingDataUShort()
{
var buffer = ReadRemainingData(2);
var buffer = ReadBytes(2);


var temp = buffer[0]; var temp = buffer[0];
buffer[0] = buffer[1]; buffer[0] = buffer[1];
@@ -59,31 +32,58 @@ namespace MQTTnet.Core.Serializer
return BitConverter.ToUInt16(buffer, 0); return BitConverter.ToUInt16(buffer, 0);
} }


public string ReadRemainingDataStringWithLengthPrefix()
public string ReadStringWithLengthPrefix()
{ {
var buffer = ReadRemainingDataWithLengthPrefix();
var buffer = ReadWithLengthPrefix();
return Encoding.UTF8.GetString(buffer, 0, buffer.Length); return Encoding.UTF8.GetString(buffer, 0, buffer.Length);
} }


public byte[] ReadRemainingDataWithLengthPrefix()
public byte[] ReadWithLengthPrefix()
{ {
var length = ReadRemainingDataUShort();
return ReadRemainingData(length);
var length = ReadUInt16();
return ReadBytes(length);
} }


public byte[] ReadRemainingData() public byte[] ReadRemainingData()
{ {
return ReadRemainingData(_remainingLength - (int)_remainingData.Position);
return ReadBytes(_header.BodyLength - (int)BaseStream.Position);
} }


public byte[] ReadRemainingData(int length)
public static async Task<MqttPacketHeader> ReadHeaderFromSourceAsync(IMqttCommunicationChannel source, byte[] buffer)
{ {
var buffer = new byte[length];
_remainingData.Read(buffer, 0, buffer.Length);
return buffer;
var fixedHeader = await ReadStreamByteAsync(source, buffer).ConfigureAwait(false);
var byteReader = new ByteReader(fixedHeader);
byteReader.Read(4);
var controlPacketType = (MqttControlPacketType)byteReader.Read(4);
var bodyLength = await ReadBodyLengthFromSourceAsync(source, buffer).ConfigureAwait(false);

return new MqttPacketHeader()
{
FixedHeader = fixedHeader,
ControlPacketType = controlPacketType,
BodyLength = bodyLength
};
} }


private async Task ReadRemainingLengthAsync()
private static async Task<byte> ReadStreamByteAsync(IMqttCommunicationChannel source, byte[] readBuffer)
{
var result = await ReadFromSourceAsync(source, 1, readBuffer).ConfigureAwait(false);
return result.Array[result.Offset];
}

public static async Task<ArraySegment<byte>> ReadFromSourceAsync(IMqttCommunicationChannel source, int length, byte[] buffer)
{
try
{
return await source.ReadAsync(length, buffer);
}
catch (Exception exception)
{
throw new MqttCommunicationException(exception);
}
}
private static async Task<int> ReadBodyLengthFromSourceAsync(IMqttCommunicationChannel source, byte[] buffer)
{ {
// Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var multiplier = 1; var multiplier = 1;
@@ -91,7 +91,7 @@ namespace MQTTnet.Core.Serializer
byte encodedByte; byte encodedByte;
do do
{ {
encodedByte = await ReadStreamByteAsync().ConfigureAwait(false);
encodedByte = await ReadStreamByteAsync(source, buffer).ConfigureAwait(false);
value += (encodedByte & 127) * multiplier; value += (encodedByte & 127) * multiplier;
multiplier *= 128; multiplier *= 128;
if (multiplier > 128 * 128 * 128) if (multiplier > 128 * 128 * 128)
@@ -99,41 +99,7 @@ namespace MQTTnet.Core.Serializer
throw new MqttProtocolViolationException("Remaining length is ivalid."); throw new MqttProtocolViolationException("Remaining length is ivalid.");
} }
} while ((encodedByte & 128) != 0); } while ((encodedByte & 128) != 0);

_remainingLength = value;
}

private Task ReadFromSourceAsync(byte[] buffer)
{
try
{
return _source.ReadAsync(buffer);
}
catch (Exception exception)
{
throw new MqttCommunicationException(exception);
}
}

private async Task<byte> ReadStreamByteAsync()
{
var buffer = new byte[1];
await ReadFromSourceAsync(buffer).ConfigureAwait(false);
return buffer[0];
}

private async Task ReadFixedHeaderAsync()
{
FixedHeader = await ReadStreamByteAsync().ConfigureAwait(false);

var byteReader = new ByteReader(FixedHeader);
byteReader.Read(4);
ControlPacketType = (MqttControlPacketType)byteReader.Read(4);
}

public void Dispose()
{
_remainingData?.Dispose();
return value;
} }
} }
} }

+ 207
- 223
MQTTnet.Core/Serializer/MqttPacketSerializer.cs View File

@@ -1,4 +1,6 @@
using System; using System;
using System.Collections.Generic;
using System.IO;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
@@ -15,80 +17,97 @@ namespace MQTTnet.Core.Serializer
private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs"); private static byte[] ProtocolVersionV310Name { get; } = Encoding.UTF8.GetBytes("MQIs");


public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
private byte[] _readBuffer = new byte[BufferConstants.Size];


public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination)
public async Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination)
{ {
if (packet == null) throw new ArgumentNullException(nameof(packet)); if (packet == null) throw new ArgumentNullException(nameof(packet));
if (destination == null) throw new ArgumentNullException(nameof(destination)); if (destination == null) throw new ArgumentNullException(nameof(destination));


using (var stream = new MemoryStream())
using (var writer = new MqttPacketWriter(stream))
{
var header = new List<byte>();
header.Add(SerializePacket(packet, writer));

var body = stream.ToArray();
MqttPacketWriter.BuildLengthHeader(body.Length, header);
await destination.WriteAsync(header.ToArray()).ConfigureAwait(false);
await destination.WriteAsync(body).ConfigureAwait(false);
}
}

private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter writer)
{
if (packet is MqttConnectPacket connectPacket) if (packet is MqttConnectPacket connectPacket)
{ {
return SerializeAsync(connectPacket, destination);
return Serialize(connectPacket, writer);
} }


if (packet is MqttConnAckPacket connAckPacket) if (packet is MqttConnAckPacket connAckPacket)
{ {
return SerializeAsync(connAckPacket, destination);
return Serialize(connAckPacket, writer);
} }


if (packet is MqttDisconnectPacket disconnectPacket) if (packet is MqttDisconnectPacket disconnectPacket)
{ {
return SerializeAsync(disconnectPacket, destination);
return Serialize(disconnectPacket, writer);
} }


if (packet is MqttPingReqPacket pingReqPacket) if (packet is MqttPingReqPacket pingReqPacket)
{ {
return SerializeAsync(pingReqPacket, destination);
return Serialize(pingReqPacket, writer);
} }


if (packet is MqttPingRespPacket pingRespPacket) if (packet is MqttPingRespPacket pingRespPacket)
{ {
return SerializeAsync(pingRespPacket, destination);
return Serialize(pingRespPacket, writer);
} }


if (packet is MqttPublishPacket publishPacket) if (packet is MqttPublishPacket publishPacket)
{ {
return SerializeAsync(publishPacket, destination);
return Serialize(publishPacket, writer);
} }


if (packet is MqttPubAckPacket pubAckPacket) if (packet is MqttPubAckPacket pubAckPacket)
{ {
return SerializeAsync(pubAckPacket, destination);
return Serialize(pubAckPacket, writer);
} }


if (packet is MqttPubRecPacket pubRecPacket) if (packet is MqttPubRecPacket pubRecPacket)
{ {
return SerializeAsync(pubRecPacket, destination);
return Serialize(pubRecPacket, writer);
} }


if (packet is MqttPubRelPacket pubRelPacket) if (packet is MqttPubRelPacket pubRelPacket)
{ {
return SerializeAsync(pubRelPacket, destination);
return Serialize(pubRelPacket, writer);
} }


if (packet is MqttPubCompPacket pubCompPacket) if (packet is MqttPubCompPacket pubCompPacket)
{ {
return SerializeAsync(pubCompPacket, destination);
return Serialize(pubCompPacket, writer);
} }


if (packet is MqttSubscribePacket subscribePacket) if (packet is MqttSubscribePacket subscribePacket)
{ {
return SerializeAsync(subscribePacket, destination);
return Serialize(subscribePacket, writer);
} }


if (packet is MqttSubAckPacket subAckPacket) if (packet is MqttSubAckPacket subAckPacket)
{ {
return SerializeAsync(subAckPacket, destination);
return Serialize(subAckPacket, writer);
} }


if (packet is MqttUnsubscribePacket unsubscribePacket) if (packet is MqttUnsubscribePacket unsubscribePacket)
{ {
return SerializeAsync(unsubscribePacket, destination);
return Serialize(unsubscribePacket, writer);
} }


if (packet is MqttUnsubAckPacket unsubAckPacket) if (packet is MqttUnsubAckPacket unsubAckPacket)
{ {
return SerializeAsync(unsubAckPacket, destination);
return Serialize(unsubAckPacket, writer);
} }


throw new MqttProtocolViolationException("Packet type invalid."); throw new MqttProtocolViolationException("Packet type invalid.");
@@ -98,11 +117,13 @@ namespace MQTTnet.Core.Serializer
{ {
if (source == null) throw new ArgumentNullException(nameof(source)); if (source == null) throw new ArgumentNullException(nameof(source));


using (var mqttPacketReader = new MqttPacketReader(source))
{
await mqttPacketReader.ReadToEndAsync().ConfigureAwait(false);
var header = await MqttPacketReader.ReadHeaderFromSourceAsync(source, _readBuffer).ConfigureAwait(false);
var body = await GetBody(source, header).ConfigureAwait(false);


switch (mqttPacketReader.ControlPacketType)
using (var mqttPacketReader = new MqttPacketReader(body, header))
{
switch (header.ControlPacketType)
{ {
case MqttControlPacketType.Connect: case MqttControlPacketType.Connect:
{ {
@@ -121,14 +142,14 @@ namespace MQTTnet.Core.Serializer


case MqttControlPacketType.Publish: case MqttControlPacketType.Publish:
{ {
return DeserializePublish(mqttPacketReader);
return DeserializePublish(mqttPacketReader, header);
} }


case MqttControlPacketType.PubAck: case MqttControlPacketType.PubAck:
{ {
return new MqttPubAckPacket return new MqttPubAckPacket
{ {
PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort()
PacketIdentifier = mqttPacketReader.ReadUInt16()
}; };
} }


@@ -136,7 +157,7 @@ namespace MQTTnet.Core.Serializer
{ {
return new MqttPubRecPacket return new MqttPubRecPacket
{ {
PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort()
PacketIdentifier = mqttPacketReader.ReadUInt16()
}; };
} }


@@ -144,7 +165,7 @@ namespace MQTTnet.Core.Serializer
{ {
return new MqttPubRelPacket return new MqttPubRelPacket
{ {
PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort()
PacketIdentifier = mqttPacketReader.ReadUInt16()
}; };
} }


@@ -152,7 +173,7 @@ namespace MQTTnet.Core.Serializer
{ {
return new MqttPubCompPacket return new MqttPubCompPacket
{ {
PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort()
PacketIdentifier = mqttPacketReader.ReadUInt16()
}; };
} }


@@ -185,28 +206,39 @@ namespace MQTTnet.Core.Serializer
{ {
return new MqttUnsubAckPacket return new MqttUnsubAckPacket
{ {
PacketIdentifier = mqttPacketReader.ReadRemainingDataUShort()
PacketIdentifier = mqttPacketReader.ReadUInt16()
}; };
} }


default: default:
{ {
throw new MqttProtocolViolationException($"Packet type ({(int)mqttPacketReader.ControlPacketType}) not supported.");
throw new MqttProtocolViolationException($"Packet type ({(int)header.ControlPacketType}) not supported.");
} }
} }
} }
} }


private async Task<MemoryStream> GetBody(IMqttCommunicationChannel source, MqttPacketHeader header)
{
if (header.BodyLength > 0)
{
var segment = await MqttPacketReader.ReadFromSourceAsync(source, header.BodyLength, _readBuffer).ConfigureAwait(false);
return new MemoryStream(segment.Array, segment.Offset, segment.Count);
}

return new MemoryStream();
}

private static MqttBasePacket DeserializeUnsubscribe(MqttPacketReader reader) private static MqttBasePacket DeserializeUnsubscribe(MqttPacketReader reader)
{ {
var packet = new MqttUnsubscribePacket var packet = new MqttUnsubscribePacket
{ {
PacketIdentifier = reader.ReadRemainingDataUShort(),
PacketIdentifier = reader.ReadUInt16(),
}; };


while (!reader.EndOfRemainingData) while (!reader.EndOfRemainingData)
{ {
packet.TopicFilters.Add(reader.ReadRemainingDataStringWithLengthPrefix());
packet.TopicFilters.Add(reader.ReadStringWithLengthPrefix());
} }


return packet; return packet;
@@ -216,32 +248,32 @@ namespace MQTTnet.Core.Serializer
{ {
var packet = new MqttSubscribePacket var packet = new MqttSubscribePacket
{ {
PacketIdentifier = reader.ReadRemainingDataUShort()
PacketIdentifier = reader.ReadUInt16()
}; };


while (!reader.EndOfRemainingData) while (!reader.EndOfRemainingData)
{ {
packet.TopicFilters.Add(new TopicFilter( packet.TopicFilters.Add(new TopicFilter(
reader.ReadRemainingDataStringWithLengthPrefix(),
(MqttQualityOfServiceLevel)reader.ReadRemainingDataByte()));
reader.ReadStringWithLengthPrefix(),
(MqttQualityOfServiceLevel)reader.ReadByte()));
} }


return packet; return packet;
} }


private static MqttBasePacket DeserializePublish(MqttPacketReader reader)
private static MqttBasePacket DeserializePublish(MqttPacketReader reader, MqttPacketHeader mqttPacketHeader)
{ {
var fixedHeader = new ByteReader(reader.FixedHeader);
var fixedHeader = new ByteReader(mqttPacketHeader.FixedHeader);
var retain = fixedHeader.Read(); var retain = fixedHeader.Read();
var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2); var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2);
var dup = fixedHeader.Read(); var dup = fixedHeader.Read();


var topic = reader.ReadRemainingDataStringWithLengthPrefix();
var topic = reader.ReadStringWithLengthPrefix();


ushort packetIdentifier = 0; ushort packetIdentifier = 0;
if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{ {
packetIdentifier = reader.ReadRemainingDataUShort();
packetIdentifier = reader.ReadUInt16();
} }


var packet = new MqttPublishPacket var packet = new MqttPublishPacket
@@ -259,13 +291,13 @@ namespace MQTTnet.Core.Serializer


private static MqttBasePacket DeserializeConnect(MqttPacketReader reader) private static MqttBasePacket DeserializeConnect(MqttPacketReader reader)
{ {
reader.ReadRemainingData(2); // Skip 2 bytes
reader.ReadBytes(2); // Skip 2 bytes


MqttProtocolVersion protocolVersion; MqttProtocolVersion protocolVersion;
var protocolName = reader.ReadRemainingData(4);
var protocolName = reader.ReadBytes(4);
if (protocolName.SequenceEqual(ProtocolVersionV310Name)) if (protocolName.SequenceEqual(ProtocolVersionV310Name))
{ {
reader.ReadRemainingData(2);
reader.ReadBytes(2);
protocolVersion = MqttProtocolVersion.V310; protocolVersion = MqttProtocolVersion.V310;
} }
else if (protocolName.SequenceEqual(ProtocolVersionV311Name)) else if (protocolName.SequenceEqual(ProtocolVersionV311Name))
@@ -277,8 +309,8 @@ namespace MQTTnet.Core.Serializer
throw new MqttProtocolViolationException("Protocol name is not supported."); throw new MqttProtocolViolationException("Protocol name is not supported.");
} }


var protocolLevel = reader.ReadRemainingDataByte();
var connectFlags = reader.ReadRemainingDataByte();
var protocolLevel = reader.ReadByte();
var connectFlags = reader.ReadByte();


var connectFlagsReader = new ByteReader(connectFlags); var connectFlagsReader = new ByteReader(connectFlags);
connectFlagsReader.Read(); // Reserved. connectFlagsReader.Read(); // Reserved.
@@ -295,26 +327,26 @@ namespace MQTTnet.Core.Serializer
var passwordFlag = connectFlagsReader.Read(); var passwordFlag = connectFlagsReader.Read();
var usernameFlag = connectFlagsReader.Read(); var usernameFlag = connectFlagsReader.Read();


packet.KeepAlivePeriod = reader.ReadRemainingDataUShort();
packet.ClientId = reader.ReadRemainingDataStringWithLengthPrefix();
packet.KeepAlivePeriod = reader.ReadUInt16();
packet.ClientId = reader.ReadStringWithLengthPrefix();


if (willFlag) if (willFlag)
{ {
packet.WillMessage = new MqttApplicationMessage( packet.WillMessage = new MqttApplicationMessage(
reader.ReadRemainingDataStringWithLengthPrefix(),
reader.ReadRemainingDataWithLengthPrefix(),
reader.ReadStringWithLengthPrefix(),
reader.ReadWithLengthPrefix(),
(MqttQualityOfServiceLevel)willQoS, (MqttQualityOfServiceLevel)willQoS,
willRetain); willRetain);
} }


if (usernameFlag) if (usernameFlag)
{ {
packet.Username = reader.ReadRemainingDataStringWithLengthPrefix();
packet.Username = reader.ReadStringWithLengthPrefix();
} }


if (passwordFlag) if (passwordFlag)
{ {
packet.Password = reader.ReadRemainingDataStringWithLengthPrefix();
packet.Password = reader.ReadStringWithLengthPrefix();
} }


ValidateConnectPacket(packet); ValidateConnectPacket(packet);
@@ -325,12 +357,12 @@ namespace MQTTnet.Core.Serializer
{ {
var packet = new MqttSubAckPacket var packet = new MqttSubAckPacket
{ {
PacketIdentifier = reader.ReadRemainingDataUShort()
PacketIdentifier = reader.ReadUInt16()
}; };


while (!reader.EndOfRemainingData) while (!reader.EndOfRemainingData)
{ {
packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)reader.ReadRemainingDataByte());
packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)reader.ReadByte());
} }


return packet; return packet;
@@ -338,8 +370,8 @@ namespace MQTTnet.Core.Serializer


private static MqttBasePacket DeserializeConnAck(MqttPacketReader reader) private static MqttBasePacket DeserializeConnAck(MqttPacketReader reader)
{ {
var variableHeader1 = reader.ReadRemainingDataByte();
var variableHeader2 = reader.ReadRemainingDataByte();
var variableHeader1 = reader.ReadByte();
var variableHeader2 = reader.ReadByte();


var packet = new MqttConnAckPacket var packet = new MqttConnAckPacket
{ {
@@ -366,260 +398,212 @@ namespace MQTTnet.Core.Serializer
} }
} }


private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
private byte Serialize(MqttConnectPacket packet, MqttPacketWriter writer)
{ {
ValidateConnectPacket(packet); ValidateConnectPacket(packet);


using (var output = new MqttPacketWriter())
// Write variable header
writer.Write(0x00, 0x04); // 3.1.2.1 Protocol Name
if (ProtocolVersion == MqttProtocolVersion.V311)
{ {
// Write variable header
output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name
if (ProtocolVersion == MqttProtocolVersion.V311)
{
output.Write(ProtocolVersionV311Name);
output.Write(0x04); // 3.1.2.2 Protocol Level (4)
}
else
{
output.Write(ProtocolVersionV310Name);
output.Write(0x64);
output.Write(0x70);
output.Write(0x03); // Protocol Level (3)
}
var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags
connectFlags.Write(false); // Reserved
connectFlags.Write(packet.CleanSession);
connectFlags.Write(packet.WillMessage != null);
writer.Write(ProtocolVersionV311Name);
writer.Write(0x04); // 3.1.2.2 Protocol Level (4)
}
else
{
writer.Write(ProtocolVersionV310Name);
writer.Write(0x64);
writer.Write(0x70);
writer.Write(0x03); // Protocol Level (3)
}


if (packet.WillMessage != null)
{
connectFlags.Write((int)packet.WillMessage.QualityOfServiceLevel, 2);
connectFlags.Write(packet.WillMessage.Retain);
}
else
{
connectFlags.Write(0, 2);
connectFlags.Write(false);
}
connectFlags.Write(packet.Password != null);
connectFlags.Write(packet.Username != null);
var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags
connectFlags.Write(false); // Reserved
connectFlags.Write(packet.CleanSession);
connectFlags.Write(packet.WillMessage != null);


output.Write(connectFlags);
output.Write(packet.KeepAlivePeriod);
output.WriteWithLengthPrefix(packet.ClientId);
if (packet.WillMessage != null)
{
connectFlags.Write((int)packet.WillMessage.QualityOfServiceLevel, 2);
connectFlags.Write(packet.WillMessage.Retain);
}
else
{
connectFlags.Write(0, 2);
connectFlags.Write(false);
}


if (packet.WillMessage != null)
{
output.WriteWithLengthPrefix(packet.WillMessage.Topic);
output.WriteWithLengthPrefix(packet.WillMessage.Payload);
}
connectFlags.Write(packet.Password != null);
connectFlags.Write(packet.Username != null);


if (packet.Username != null)
{
output.WriteWithLengthPrefix(packet.Username);
}
writer.Write(connectFlags);
writer.Write(packet.KeepAlivePeriod);
writer.WriteWithLengthPrefix(packet.ClientId);


if (packet.Password != null)
{
output.WriteWithLengthPrefix(packet.Password);
}
if (packet.WillMessage != null)
{
writer.WriteWithLengthPrefix(packet.WillMessage.Topic);
writer.WriteWithLengthPrefix(packet.WillMessage.Payload);
}


output.InjectFixedHeader(MqttControlPacketType.Connect);
return output.WriteToAsync(destination);
if (packet.Username != null)
{
writer.WriteWithLengthPrefix(packet.Username);
} }

if (packet.Password != null)
{
writer.WriteWithLengthPrefix(packet.Password);
}

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Connect);
} }


private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
private byte Serialize(MqttConnAckPacket packet, MqttPacketWriter writer)
{ {
using (var output = new MqttPacketWriter())
var connectAcknowledgeFlags = new ByteWriter();

if (ProtocolVersion == MqttProtocolVersion.V311)
{ {
var connectAcknowledgeFlags = new ByteWriter();
connectAcknowledgeFlags.Write(packet.IsSessionPresent);
}


if (ProtocolVersion == MqttProtocolVersion.V311)
{
connectAcknowledgeFlags.Write(packet.IsSessionPresent);
}
output.Write(connectAcknowledgeFlags);
output.Write((byte)packet.ConnectReturnCode);
writer.Write(connectAcknowledgeFlags);
writer.Write((byte)packet.ConnectReturnCode);


output.InjectFixedHeader(MqttControlPacketType.ConnAck);
return output.WriteToAsync(destination);
}
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.ConnAck);
} }


private static async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination)
private static byte Serialize(MqttPubRelPacket packet, MqttPacketWriter writer)
{ {
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);
writer.Write(packet.PacketIdentifier);


output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02);
await output.WriteToAsync(destination).ConfigureAwait(false);
}
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02);
} }


private static Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination)
private static byte Serialize(MqttDisconnectPacket packet, MqttPacketWriter writer)
{ {
return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination);
return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, writer);
} }


private static Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination)
private static byte Serialize(MqttPingReqPacket packet, MqttPacketWriter writer)
{ {
return SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, destination);
return SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, writer);
} }


private static Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination)
private static byte Serialize(MqttPingRespPacket packet, MqttPacketWriter writer)
{ {
return SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, destination);
return SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, writer);
} }


private static Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination)
private static byte Serialize(MqttPublishPacket packet, MqttPacketWriter writer)
{ {
ValidatePublishPacket(packet); ValidatePublishPacket(packet);


using (var output = new MqttPacketWriter())
{
output.WriteWithLengthPrefix(packet.Topic);
writer.WriteWithLengthPrefix(packet.Topic);


if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
output.Write(packet.PacketIdentifier);
}
else
if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
writer.Write(packet.PacketIdentifier);
}
else
{
if (packet.PacketIdentifier > 0)
{ {
if (packet.PacketIdentifier > 0)
{
throw new MqttProtocolViolationException("Packet identifier must be empty if QoS == 0 [MQTT-2.3.1-5].");
}
throw new MqttProtocolViolationException("Packet identifier must be empty if QoS == 0 [MQTT-2.3.1-5].");
} }
}


if (packet.Payload?.Length > 0)
{
output.Write(packet.Payload);
}
if (packet.Payload?.Length > 0)
{
writer.Write(packet.Payload);
}


var fixedHeader = new ByteWriter();
fixedHeader.Write(packet.Retain);
fixedHeader.Write((byte)packet.QualityOfServiceLevel, 2);
fixedHeader.Write(packet.Dup);
var fixedHeader = new ByteWriter();
fixedHeader.Write(packet.Retain);
fixedHeader.Write((byte)packet.QualityOfServiceLevel, 2);
fixedHeader.Write(packet.Dup);


output.InjectFixedHeader(MqttControlPacketType.Publish, fixedHeader.Value);
return output.WriteToAsync(destination);
}
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader.Value);
} }


private static Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination)
private static byte Serialize(MqttPubAckPacket packet, MqttPacketWriter writer)
{ {
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);
writer.Write(packet.PacketIdentifier);


output.InjectFixedHeader(MqttControlPacketType.PubAck);
return output.WriteToAsync(destination);
}
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck);
} }


private static Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination)
private static byte Serialize(MqttPubRecPacket packet, MqttPacketWriter writer)
{ {
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);
writer.Write(packet.PacketIdentifier);


output.InjectFixedHeader(MqttControlPacketType.PubRec);
return output.WriteToAsync(destination);
}
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec);
} }


private static Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination)
private static byte Serialize(MqttPubCompPacket packet, MqttPacketWriter writer)
{ {
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);
writer.Write(packet.PacketIdentifier);


output.InjectFixedHeader(MqttControlPacketType.PubComp);
return output.WriteToAsync(destination);
}
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp);
} }


private static Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination)
private static byte Serialize(MqttSubscribePacket packet, MqttPacketWriter writer)
{ {
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);
writer.Write(packet.PacketIdentifier);


if (packet.TopicFilters?.Count > 0)
if (packet.TopicFilters?.Count > 0)
{
foreach (var topicFilter in packet.TopicFilters)
{ {
foreach (var topicFilter in packet.TopicFilters)
{
output.WriteWithLengthPrefix(topicFilter.Topic);
output.Write((byte)topicFilter.QualityOfServiceLevel);
}
writer.WriteWithLengthPrefix(topicFilter.Topic);
writer.Write((byte)topicFilter.QualityOfServiceLevel);
} }

output.InjectFixedHeader(MqttControlPacketType.Subscribe, 0x02);
return output.WriteToAsync(destination);
} }

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Subscribe, 0x02);
} }


private static Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination)
private static byte Serialize(MqttSubAckPacket packet, MqttPacketWriter writer)
{ {
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);
writer.Write(packet.PacketIdentifier);


if (packet.SubscribeReturnCodes?.Any() == true)
if (packet.SubscribeReturnCodes?.Any() == true)
{
foreach (var packetSubscribeReturnCode in packet.SubscribeReturnCodes)
{ {
foreach (var packetSubscribeReturnCode in packet.SubscribeReturnCodes)
{
output.Write((byte)packetSubscribeReturnCode);
}
writer.Write((byte)packetSubscribeReturnCode);
} }

output.InjectFixedHeader(MqttControlPacketType.SubAck);
return output.WriteToAsync(destination);
} }

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.SubAck);
} }


private static Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination)
private static byte Serialize(MqttUnsubscribePacket packet, MqttPacketWriter writer)
{ {
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);
writer.Write(packet.PacketIdentifier);


if (packet.TopicFilters?.Any() == true)
if (packet.TopicFilters?.Any() == true)
{
foreach (var topicFilter in packet.TopicFilters)
{ {
foreach (var topicFilter in packet.TopicFilters)
{
output.WriteWithLengthPrefix(topicFilter);
}
writer.WriteWithLengthPrefix(topicFilter);
} }

output.InjectFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
return output.WriteToAsync(destination);
} }

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
} }


private static Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination)
private static byte Serialize(MqttUnsubAckPacket packet, MqttPacketWriter writer)
{ {
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);
writer.Write(packet.PacketIdentifier);


output.InjectFixedHeader(MqttControlPacketType.UnsubAck);
return output.WriteToAsync(destination);
}
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck);
} }


private static Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination)
private static byte SerializeEmptyPacketAsync(MqttControlPacketType type, MqttPacketWriter writer)
{ {
using (var output = new MqttPacketWriter())
{
output.InjectFixedHeader(type);
return output.WriteToAsync(destination);
}
return MqttPacketWriter.BuildFixedHeader(type);
} }
} }
} }

+ 23
- 49
MQTTnet.Core/Serializer/MqttPacketWriter.cs View File

@@ -1,49 +1,45 @@
using System; using System;
using System.IO; using System.IO;
using System.Text; using System.Text;
using System.Threading.Tasks;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using System.Collections.Generic;


namespace MQTTnet.Core.Serializer namespace MQTTnet.Core.Serializer
{ {
public sealed class MqttPacketWriter : IDisposable
public sealed class MqttPacketWriter : BinaryWriter
{ {
private readonly MemoryStream _buffer = new MemoryStream(1024);

public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0)
public MqttPacketWriter( Stream stream )
: base(stream)
{ {
var fixedHeader = (int)packetType << 4;
fixedHeader |= flags;
InjectFixedHeader((byte)fixedHeader);

} }


public void Write(byte value)
public static byte BuildFixedHeader(MqttControlPacketType packetType, byte flags = 0)
{ {
_buffer.WriteByte(value);
var fixedHeader = (int)packetType << 4;
fixedHeader |= flags;
return (byte)fixedHeader;
} }

public void Write(ushort value)
public override void Write(ushort value)
{ {
var buffer = BitConverter.GetBytes(value); var buffer = BitConverter.GetBytes(value);
_buffer.WriteByte(buffer[1]);
_buffer.WriteByte(buffer[0]);
Write(buffer[1]);
Write(buffer[0]);
} }


public void Write(ByteWriter value)
public new void Write(params byte[] values)
{ {
if (value == null) throw new ArgumentNullException(nameof(value));

_buffer.WriteByte(value.Value);
base.Write(values);
} }


public void Write(params byte[] value)
public void Write(ByteWriter value)
{ {
if (value == null) throw new ArgumentNullException(nameof(value)); if (value == null) throw new ArgumentNullException(nameof(value));


_buffer.Write(value, 0, value.Length);
Write(value.Value);
} }
public void WriteWithLengthPrefix(string value) public void WriteWithLengthPrefix(string value)
{ {
WriteWithLengthPrefix(Encoding.UTF8.GetBytes(value ?? string.Empty)); WriteWithLengthPrefix(Encoding.UTF8.GetBytes(value ?? string.Empty));
@@ -57,36 +53,16 @@ namespace MQTTnet.Core.Serializer
Write(value); Write(value);
} }


public Task WriteToAsync(IMqttCommunicationChannel destination)
public static void BuildLengthHeader(int length, List<byte> header)
{ {
if (destination == null) throw new ArgumentNullException(nameof(destination));

return destination.WriteAsync(_buffer.ToArray());
}

public void Dispose()
{
_buffer?.Dispose();
}

private void InjectFixedHeader(byte fixedHeader)
{
if (_buffer.Length == 0)
if (length == 0)
{ {
Write(fixedHeader);
Write(0);
header.Add(0);
return; return;
} }


var backupBuffer = _buffer.ToArray();
var remainingLength = (int)_buffer.Length;

_buffer.SetLength(0);

_buffer.WriteByte(fixedHeader);

// Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var x = remainingLength;
var x = length;
do do
{ {
var encodedByte = x % 128; var encodedByte = x % 128;
@@ -96,10 +72,8 @@ namespace MQTTnet.Core.Serializer
encodedByte = encodedByte | 128; encodedByte = encodedByte | 128;
} }


_buffer.WriteByte((byte)encodedByte);
header.Add((byte)encodedByte);
} while (x > 0); } while (x > 0);

_buffer.Write(backupBuffer, 0, backupBuffer.Length);
} }
} }
} }

+ 8
- 2
Tests/MQTTnet.Core.Tests/MqttPacketSerializerTests.cs View File

@@ -418,15 +418,21 @@ namespace MQTTnet.Core.Tests
return _stream.WriteAsync(buffer, 0, buffer.Length); return _stream.WriteAsync(buffer, 0, buffer.Length);
} }


public Task ReadAsync(byte[] buffer)
public async Task<ArraySegment<byte>> ReadAsync(int length, byte[] buffer)
{ {
return _stream.ReadAsync(buffer, 0, buffer.Length);
await _stream.ReadAsync(buffer, 0, length);
return new ArraySegment<byte>(buffer, 0, length);
} }


public byte[] ToArray() public byte[] ToArray()
{ {
return _stream.ToArray(); return _stream.ToArray();
} }

public int Peek()
{
return (int)_stream.Length - (int)_stream.Position;
}
} }


private static void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311) private static void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311)


+ 1
- 0
Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj View File

@@ -82,6 +82,7 @@
<Reference Include="System.Xml" /> <Reference Include="System.Xml" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="PerformanceTest.cs" />
<Compile Include="Program.cs" /> <Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup> </ItemGroup>


+ 165
- 0
Tests/MQTTnet.TestApp.NetFramework/PerformanceTest.cs View File

@@ -0,0 +1,165 @@
using MQTTnet.Core;
using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQTTnet.TestApp.NetFramework
{
public static class PerformanceTest
{
public static async Task RunAsync()
{
var server = Task.Run(() => RunServerAsync());
var client = Task.Run(() => RunClientAsync(50, TimeSpan.FromMilliseconds(10)));

await Task.WhenAll(server, client).ConfigureAwait(false);
}

private static async Task RunClientAsync( int msgChunkSize, TimeSpan interval )
{
try
{
var options = new MqttClientOptions
{
Server = "localhost",
ClientId = "XYZ",
CleanSession = true,
DefaultCommunicationTimeout = TimeSpan.FromMinutes(10)
};

var client = new MqttClientFactory().CreateMqttClient(options);
client.ApplicationMessageReceived += (s, e) =>
{
};

client.Connected += async (s, e) =>
{
Console.WriteLine("### CONNECTED WITH SERVER ###");

await client.SubscribeAsync(new List<TopicFilter>
{
new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)
});

Console.WriteLine("### SUBSCRIBED ###");
};

client.Disconnected += async (s, e) =>
{
Console.WriteLine("### DISCONNECTED FROM SERVER ###");
await Task.Delay(TimeSpan.FromSeconds(5));

try
{
await client.ConnectAsync();
}
catch
{
Console.WriteLine("### RECONNECTING FAILED ###");
}
};

try
{
await client.ConnectAsync();
}
catch (Exception exception)
{
Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception);
}

Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###");

var last = DateTime.Now;
var msgs = 0;

while (true)
{
for (int i = 0; i < msgChunkSize; i++)
{
var applicationMessage = new MqttApplicationMessage(
"A/B/C",
Encoding.UTF8.GetBytes("Hello World"),
MqttQualityOfServiceLevel.AtLeastOnce,
false
);

//do not await to send as much messages as possible
await client.PublishAsync(applicationMessage);
msgs++;
}

var now = DateTime.Now;
if (last < now - TimeSpan.FromSeconds(1))
{
Console.WriteLine( $"sending {msgs} inteded {msgChunkSize / interval.TotalSeconds}" );
msgs = 0;
last = now;
}

await Task.Delay(interval).ConfigureAwait(false);
}
}
catch (Exception exception)
{
Console.WriteLine(exception);
}
}

private static void RunServerAsync()
{
try
{
var options = new MqttServerOptions
{
ConnectionValidator = p =>
{
if (p.ClientId == "SpecialClient")
{
if (p.Username != "USER" || p.Password != "PASS")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
}

return MqttConnectReturnCode.ConnectionAccepted;
},
DefaultCommunicationTimeout = TimeSpan.FromMinutes(10)
};
var mqttServer = new MqttServerFactory().CreateMqttServer(options);
var last = DateTime.Now;
var msgs = 0;
mqttServer.ApplicationMessageReceived += (sender, args) =>
{
msgs++;
var now = DateTime.Now;
if (last < now - TimeSpan.FromSeconds(1))
{
Console.WriteLine($"received {msgs}");
msgs = 0;
last = now;
}
};
mqttServer.Start();

Console.WriteLine("Press any key to exit.");
Console.ReadLine();

mqttServer.Stop();
}
catch (Exception e)
{
Console.WriteLine(e);
}

Console.ReadLine();
}
}
}

+ 6
- 0
Tests/MQTTnet.TestApp.NetFramework/Program.cs View File

@@ -19,6 +19,7 @@ namespace MQTTnet.TestApp.NetFramework
Console.WriteLine("MQTTnet - TestApp.NetFramework"); Console.WriteLine("MQTTnet - TestApp.NetFramework");
Console.WriteLine("1 = Start client"); Console.WriteLine("1 = Start client");
Console.WriteLine("2 = Start server"); Console.WriteLine("2 = Start server");
Console.WriteLine("3 = Start performance test");
var pressedKey = Console.ReadKey(true); var pressedKey = Console.ReadKey(true);
if (pressedKey.Key == ConsoleKey.D1) if (pressedKey.Key == ConsoleKey.D1)
{ {
@@ -30,6 +31,11 @@ namespace MQTTnet.TestApp.NetFramework
Task.Run(() => RunServerAsync(args)); Task.Run(() => RunServerAsync(args));
Thread.Sleep(Timeout.Infinite); Thread.Sleep(Timeout.Infinite);
} }
else if (pressedKey.Key == ConsoleKey.D3)
{
Task.Run(() => PerformanceTest.RunAsync());
Thread.Sleep(Timeout.Infinite);
}
} }


private static async Task RunClientAsync(string[] arguments) private static async Task RunClientAsync(string[] arguments)


Loading…
Cancel
Save