Browse Source

Merge pull request #257 from JanEggers/AllocationBenchmark

[wip] improved serializer performance
release/3.x.x
Christian 6 years ago
committed by GitHub
parent
commit
e0899abaac
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 139 additions and 165 deletions
  1. +17
    -20
      Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs
  2. +111
    -120
      Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs
  3. +11
    -25
      Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs

+ 17
- 20
Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketReader.cs View File

@@ -10,18 +10,8 @@ using MQTTnet.Protocol;

namespace MQTTnet.Serializer
{
public sealed class MqttPacketReader : BinaryReader
public static class MqttPacketReader
{
private readonly MqttPacketHeader _header;

public MqttPacketReader(MqttPacketHeader header, Stream bodyStream)
: base(bodyStream, Encoding.UTF8, true)
{
_header = header;
}

public bool EndOfRemainingData => BaseStream.Position == _header.BodyLength;

public static async Task<MqttPacketHeader> ReadHeaderAsync(IMqttChannel stream, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
@@ -51,9 +41,9 @@ namespace MQTTnet.Serializer
};
}

public override ushort ReadUInt16()
public static ushort ReadUInt16(this Stream stream)
{
var buffer = ReadBytes(2);
var buffer = stream.ReadBytes(2);

var temp = buffer[0];
buffer[0] = buffer[1];
@@ -62,9 +52,9 @@ namespace MQTTnet.Serializer
return BitConverter.ToUInt16(buffer, 0);
}

public string ReadStringWithLengthPrefix()
public static string ReadStringWithLengthPrefix(this Stream stream)
{
var buffer = ReadWithLengthPrefix();
var buffer = stream.ReadWithLengthPrefix();
if (buffer.Length == 0)
{
return string.Empty;
@@ -73,20 +63,27 @@ namespace MQTTnet.Serializer
return Encoding.UTF8.GetString(buffer, 0, buffer.Length);
}

public byte[] ReadWithLengthPrefix()
public static byte[] ReadWithLengthPrefix(this Stream stream)
{
var length = ReadUInt16();
var length = stream.ReadUInt16();
if (length == 0)
{
return new byte[0];
}

return ReadBytes(length);
return stream.ReadBytes(length);
}

public static byte[] ReadRemainingData(this Stream stream, MqttPacketHeader header)
{
return stream.ReadBytes(header.BodyLength - (int)stream.Position);
}

public byte[] ReadRemainingData()
public static byte[] ReadBytes(this Stream stream, int count)
{
return ReadBytes(_header.BodyLength - (int)BaseStream.Position);
var buffer = new byte[count];
stream.Read(buffer, 0, count);
return buffer;
}

private static async Task<int> ReadBodyLengthAsync(IMqttChannel stream, CancellationToken cancellationToken)


+ 111
- 120
Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs View File

@@ -20,11 +20,10 @@ namespace MQTTnet.Serializer
if (packet == null) throw new ArgumentNullException(nameof(packet));

using (var stream = new MemoryStream(128))
using (var writer = new MqttPacketWriter(stream))
{
// Leave enough head space for max header size (fixed + 4 variable remaining length)
stream.Position = 5;
var fixedHeader = SerializePacket(packet, writer);
var fixedHeader = SerializePacket(packet, stream);

stream.Position = 1;
var remainingLength = MqttPacketWriter.EncodeRemainingLength((int)stream.Length - 5, stream);
@@ -35,7 +34,7 @@ namespace MQTTnet.Serializer
// Position cursor on correct offset on beginining of array (has leading 0x0)
stream.Position = headerOffset;

writer.Write(fixedHeader);
stream.WriteByte(fixedHeader);

#if NET461 || NET452 || NETSTANDARD2_0
var buffer = stream.GetBuffer();
@@ -46,146 +45,138 @@ namespace MQTTnet.Serializer
}
}

public MqttBasePacket Deserialize(MqttPacketHeader header, Stream body)
{
if (header == null) throw new ArgumentNullException(nameof(header));
if (body == null) throw new ArgumentNullException(nameof(body));

using (var reader = new MqttPacketReader(header, body))
{
return Deserialize(header, reader);
}
}

private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter writer)
private byte SerializePacket(MqttBasePacket packet, Stream stream)
{
switch (packet)
{
case MqttConnectPacket connectPacket: return Serialize(connectPacket, writer);
case MqttConnAckPacket connAckPacket: return Serialize(connAckPacket, writer);
case MqttConnectPacket connectPacket: return Serialize(connectPacket, stream);
case MqttConnAckPacket connAckPacket: return Serialize(connAckPacket, stream);
case MqttDisconnectPacket _: return SerializeEmptyPacket(MqttControlPacketType.Disconnect);
case MqttPingReqPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingReq);
case MqttPingRespPacket _: return SerializeEmptyPacket(MqttControlPacketType.PingResp);
case MqttPublishPacket publishPacket: return Serialize(publishPacket, writer);
case MqttPubAckPacket pubAckPacket: return Serialize(pubAckPacket, writer);
case MqttPubRecPacket pubRecPacket: return Serialize(pubRecPacket, writer);
case MqttPubRelPacket pubRelPacket: return Serialize(pubRelPacket, writer);
case MqttPubCompPacket pubCompPacket: return Serialize(pubCompPacket, writer);
case MqttSubscribePacket subscribePacket: return Serialize(subscribePacket, writer);
case MqttSubAckPacket subAckPacket: return Serialize(subAckPacket, writer);
case MqttUnsubscribePacket unsubscribePacket: return Serialize(unsubscribePacket, writer);
case MqttUnsubAckPacket unsubAckPacket: return Serialize(unsubAckPacket, writer);
case MqttPublishPacket publishPacket: return Serialize(publishPacket, stream);
case MqttPubAckPacket pubAckPacket: return Serialize(pubAckPacket, stream);
case MqttPubRecPacket pubRecPacket: return Serialize(pubRecPacket, stream);
case MqttPubRelPacket pubRelPacket: return Serialize(pubRelPacket, stream);
case MqttPubCompPacket pubCompPacket: return Serialize(pubCompPacket, stream);
case MqttSubscribePacket subscribePacket: return Serialize(subscribePacket, stream);
case MqttSubAckPacket subAckPacket: return Serialize(subAckPacket, stream);
case MqttUnsubscribePacket unsubscribePacket: return Serialize(unsubscribePacket, stream);
case MqttUnsubAckPacket unsubAckPacket: return Serialize(unsubAckPacket, stream);
default: throw new MqttProtocolViolationException("Packet type invalid.");
}
}

private MqttBasePacket Deserialize(MqttPacketHeader header, MqttPacketReader reader)
public MqttBasePacket Deserialize(MqttPacketHeader header, Stream stream)
{
if (header == null) throw new ArgumentNullException(nameof(header));
if (stream == null) throw new ArgumentNullException(nameof(stream));

switch (header.ControlPacketType)
{
case MqttControlPacketType.Connect: return DeserializeConnect(reader);
case MqttControlPacketType.ConnAck: return DeserializeConnAck(reader);
case MqttControlPacketType.Connect: return DeserializeConnect(stream);
case MqttControlPacketType.ConnAck: return DeserializeConnAck(stream);
case MqttControlPacketType.Disconnect: return new MqttDisconnectPacket();
case MqttControlPacketType.Publish: return DeserializePublish(reader, header);
case MqttControlPacketType.PubAck: return DeserializePubAck(reader);
case MqttControlPacketType.PubRec: return DeserializePubRec(reader);
case MqttControlPacketType.PubRel: return DeserializePubRel(reader);
case MqttControlPacketType.PubComp: return DeserializePubComp(reader);
case MqttControlPacketType.Publish: return DeserializePublish(stream, header);
case MqttControlPacketType.PubAck: return DeserializePubAck(stream);
case MqttControlPacketType.PubRec: return DeserializePubRec(stream);
case MqttControlPacketType.PubRel: return DeserializePubRel(stream);
case MqttControlPacketType.PubComp: return DeserializePubComp(stream);
case MqttControlPacketType.PingReq: return new MqttPingReqPacket();
case MqttControlPacketType.PingResp: return new MqttPingRespPacket();
case MqttControlPacketType.Subscribe: return DeserializeSubscribe(reader);
case MqttControlPacketType.SubAck: return DeserializeSubAck(reader);
case MqttControlPacketType.Unsubscibe: return DeserializeUnsubscribe(reader);
case MqttControlPacketType.UnsubAck: return DeserializeUnsubAck(reader);
case MqttControlPacketType.Subscribe: return DeserializeSubscribe(stream, header);
case MqttControlPacketType.SubAck: return DeserializeSubAck(stream, header);
case MqttControlPacketType.Unsubscibe: return DeserializeUnsubscribe(stream, header);
case MqttControlPacketType.UnsubAck: return DeserializeUnsubAck(stream);
default: throw new MqttProtocolViolationException($"Packet type ({(int)header.ControlPacketType}) not supported.");
}
}

private static MqttBasePacket DeserializeUnsubAck(MqttPacketReader reader)
private static MqttBasePacket DeserializeUnsubAck(Stream stream)
{
return new MqttUnsubAckPacket
{
PacketIdentifier = reader.ReadUInt16()
PacketIdentifier = stream.ReadUInt16()
};
}

private static MqttBasePacket DeserializePubComp(MqttPacketReader reader)
private static MqttBasePacket DeserializePubComp(Stream stream)
{
return new MqttPubCompPacket
{
PacketIdentifier = reader.ReadUInt16()
PacketIdentifier = stream.ReadUInt16()
};
}

private static MqttBasePacket DeserializePubRel(MqttPacketReader reader)
private static MqttBasePacket DeserializePubRel(Stream stream)
{
return new MqttPubRelPacket
{
PacketIdentifier = reader.ReadUInt16()
PacketIdentifier = stream.ReadUInt16()
};
}

private static MqttBasePacket DeserializePubRec(MqttPacketReader reader)
private static MqttBasePacket DeserializePubRec(Stream stream)
{
return new MqttPubRecPacket
{
PacketIdentifier = reader.ReadUInt16()
PacketIdentifier = stream.ReadUInt16()
};
}

private static MqttBasePacket DeserializePubAck(MqttPacketReader reader)
private static MqttBasePacket DeserializePubAck(Stream stream)
{
return new MqttPubAckPacket
{
PacketIdentifier = reader.ReadUInt16()
PacketIdentifier = stream.ReadUInt16()
};
}

private static MqttBasePacket DeserializeUnsubscribe(MqttPacketReader reader)
private static MqttBasePacket DeserializeUnsubscribe(Stream stream, MqttPacketHeader header)
{
var packet = new MqttUnsubscribePacket
{
PacketIdentifier = reader.ReadUInt16(),
PacketIdentifier = stream.ReadUInt16(),
};

while (!reader.EndOfRemainingData)
while (stream.Position != header.BodyLength)
{
packet.TopicFilters.Add(reader.ReadStringWithLengthPrefix());
packet.TopicFilters.Add(stream.ReadStringWithLengthPrefix());
}

return packet;
}

private static MqttBasePacket DeserializeSubscribe(MqttPacketReader reader)
private static MqttBasePacket DeserializeSubscribe(Stream stream, MqttPacketHeader header)
{
var packet = new MqttSubscribePacket
{
PacketIdentifier = reader.ReadUInt16()
PacketIdentifier = stream.ReadUInt16()
};

while (!reader.EndOfRemainingData)
while (stream.Position != header.BodyLength)
{
packet.TopicFilters.Add(new TopicFilter(
reader.ReadStringWithLengthPrefix(),
(MqttQualityOfServiceLevel)reader.ReadByte()));
stream.ReadStringWithLengthPrefix(),
(MqttQualityOfServiceLevel)stream.ReadByte()));
}

return packet;
}

private static MqttBasePacket DeserializePublish(MqttPacketReader reader, MqttPacketHeader mqttPacketHeader)
private static MqttBasePacket DeserializePublish(Stream stream, MqttPacketHeader mqttPacketHeader)
{
var fixedHeader = new ByteReader(mqttPacketHeader.FixedHeader);
var retain = fixedHeader.Read();
var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2);
var dup = fixedHeader.Read();

var topic = reader.ReadStringWithLengthPrefix();
var topic = stream.ReadStringWithLengthPrefix();

ushort? packetIdentifier = null;
if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
packetIdentifier = reader.ReadUInt16();
packetIdentifier = stream.ReadUInt16();
}

var packet = new MqttPublishPacket
@@ -193,7 +184,7 @@ namespace MQTTnet.Serializer
PacketIdentifier = packetIdentifier,
Retain = retain,
Topic = topic,
Payload = reader.ReadRemainingData(),
Payload = stream.ReadRemainingData(mqttPacketHeader),
QualityOfServiceLevel = qualityOfServiceLevel,
Dup = dup
};
@@ -201,12 +192,12 @@ namespace MQTTnet.Serializer
return packet;
}

private static MqttBasePacket DeserializeConnect(MqttPacketReader reader)
private static MqttBasePacket DeserializeConnect(Stream stream)
{
reader.ReadBytes(2); // Skip 2 bytes for header and remaining length.
stream.ReadBytes(2); // Skip 2 bytes for header and remaining length.

MqttProtocolVersion protocolVersion;
var protocolName = reader.ReadBytes(4);
var protocolName = stream.ReadBytes(4);

if (protocolName.SequenceEqual(ProtocolVersionV311Name))
{
@@ -216,7 +207,7 @@ namespace MQTTnet.Serializer
{
var buffer = new byte[6];
Array.Copy(protocolName, buffer, 4);
protocolName = reader.ReadBytes(2);
protocolName = stream.ReadBytes(2);
Array.Copy(protocolName, 0, buffer, 4, 2);

if (protocolName.SequenceEqual(ProtocolVersionV310Name))
@@ -229,8 +220,8 @@ namespace MQTTnet.Serializer
}
}

reader.ReadByte(); // Skip protocol level
var connectFlags = reader.ReadByte();
stream.ReadByte(); // Skip protocol level
var connectFlags = stream.ReadByte();

var connectFlagsReader = new ByteReader(connectFlags);
connectFlagsReader.Read(); // Reserved.
@@ -247,15 +238,15 @@ namespace MQTTnet.Serializer
var passwordFlag = connectFlagsReader.Read();
var usernameFlag = connectFlagsReader.Read();

packet.KeepAlivePeriod = reader.ReadUInt16();
packet.ClientId = reader.ReadStringWithLengthPrefix();
packet.KeepAlivePeriod = stream.ReadUInt16();
packet.ClientId = stream.ReadStringWithLengthPrefix();

if (willFlag)
{
packet.WillMessage = new MqttApplicationMessage
{
Topic = reader.ReadStringWithLengthPrefix(),
Payload = reader.ReadWithLengthPrefix(),
Topic = stream.ReadStringWithLengthPrefix(),
Payload = stream.ReadWithLengthPrefix(),
QualityOfServiceLevel = (MqttQualityOfServiceLevel)willQoS,
Retain = willRetain
};
@@ -263,45 +254,45 @@ namespace MQTTnet.Serializer

if (usernameFlag)
{
packet.Username = reader.ReadStringWithLengthPrefix();
packet.Username = stream.ReadStringWithLengthPrefix();
}

if (passwordFlag)
{
packet.Password = reader.ReadStringWithLengthPrefix();
packet.Password = stream.ReadStringWithLengthPrefix();
}

ValidateConnectPacket(packet);
return packet;
}

private static MqttBasePacket DeserializeSubAck(MqttPacketReader reader)
private static MqttBasePacket DeserializeSubAck(Stream stream, MqttPacketHeader header)
{
var packet = new MqttSubAckPacket
{
PacketIdentifier = reader.ReadUInt16()
PacketIdentifier = stream.ReadUInt16()
};

while (!reader.EndOfRemainingData)
while (stream.Position != header.BodyLength)
{
packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)reader.ReadByte());
packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)stream.ReadByte());
}

return packet;
}

private MqttBasePacket DeserializeConnAck(MqttPacketReader reader)
private MqttBasePacket DeserializeConnAck(Stream stream)
{
var packet = new MqttConnAckPacket();

var firstByteReader = new ByteReader(reader.ReadByte());
var firstByteReader = new ByteReader(stream.ReadByte());

if (ProtocolVersion == MqttProtocolVersion.V311)
{
packet.IsSessionPresent = firstByteReader.Read();
}

packet.ConnectReturnCode = (MqttConnectReturnCode)reader.ReadByte();
packet.ConnectReturnCode = (MqttConnectReturnCode)stream.ReadByte();

return packet;
}
@@ -326,20 +317,20 @@ namespace MQTTnet.Serializer
}
}

private byte Serialize(MqttConnectPacket packet, MqttPacketWriter writer)
private byte Serialize(MqttConnectPacket packet, Stream stream)
{
ValidateConnectPacket(packet);

// Write variable header
if (ProtocolVersion == MqttProtocolVersion.V311)
{
writer.WriteWithLengthPrefix(ProtocolVersionV311Name);
writer.Write(0x04); // 3.1.2.2 Protocol Level 4
stream.WriteWithLengthPrefix(ProtocolVersionV311Name);
stream.WriteByte(0x04); // 3.1.2.2 Protocol Level 4
}
else
{
writer.WriteWithLengthPrefix(ProtocolVersionV310Name);
writer.Write(0x03); // Protocol Level 3
stream.WriteWithLengthPrefix(ProtocolVersionV310Name);
stream.WriteByte(0x03); // Protocol Level 3
}

var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags
@@ -366,68 +357,68 @@ namespace MQTTnet.Serializer
connectFlags.Write(packet.Password != null);
connectFlags.Write(packet.Username != null);

writer.Write(connectFlags);
writer.Write(packet.KeepAlivePeriod);
writer.WriteWithLengthPrefix(packet.ClientId);
stream.Write(connectFlags);
stream.Write(packet.KeepAlivePeriod);
stream.WriteWithLengthPrefix(packet.ClientId);

if (packet.WillMessage != null)
{
writer.WriteWithLengthPrefix(packet.WillMessage.Topic);
writer.WriteWithLengthPrefix(packet.WillMessage.Payload);
stream.WriteWithLengthPrefix(packet.WillMessage.Topic);
stream.WriteWithLengthPrefix(packet.WillMessage.Payload);
}

if (packet.Username != null)
{
writer.WriteWithLengthPrefix(packet.Username);
stream.WriteWithLengthPrefix(packet.Username);
}

if (packet.Password != null)
{
writer.WriteWithLengthPrefix(packet.Password);
stream.WriteWithLengthPrefix(packet.Password);
}

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Connect);
}

private byte Serialize(MqttConnAckPacket packet, MqttPacketWriter writer)
private byte Serialize(MqttConnAckPacket packet, Stream stream)
{
if (ProtocolVersion == MqttProtocolVersion.V310)
{
writer.Write(0);
stream.WriteByte(0);
}
else if (ProtocolVersion == MqttProtocolVersion.V311)
{
var connectAcknowledgeFlags = new ByteWriter();
connectAcknowledgeFlags.Write(packet.IsSessionPresent);
writer.Write(connectAcknowledgeFlags);
stream.Write(connectAcknowledgeFlags);
}
else
{
throw new MqttProtocolViolationException("Protocol version not supported.");
}

writer.Write((byte)packet.ConnectReturnCode);
stream.WriteByte((byte)packet.ConnectReturnCode);

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.ConnAck);
}

private static byte Serialize(MqttPubRelPacket packet, MqttPacketWriter writer)
private static byte Serialize(MqttPubRelPacket packet, Stream stream)
{
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("PubRel packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);
stream.Write(packet.PacketIdentifier.Value);

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02);
}

private static byte Serialize(MqttPublishPacket packet, MqttPacketWriter writer)
private static byte Serialize(MqttPublishPacket packet, Stream stream)
{
ValidatePublishPacket(packet);

writer.WriteWithLengthPrefix(packet.Topic);
stream.WriteWithLengthPrefix(packet.Topic);

if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
@@ -436,7 +427,7 @@ namespace MQTTnet.Serializer
throw new MqttProtocolViolationException("Publish packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);
stream.Write(packet.PacketIdentifier.Value);
}
else
{
@@ -448,7 +439,7 @@ namespace MQTTnet.Serializer

if (packet.Payload?.Length > 0)
{
writer.Write(packet.Payload);
stream.Write(packet.Payload, 0, packet.Payload.Length);
}

byte fixedHeader = 0;
@@ -468,43 +459,43 @@ namespace MQTTnet.Serializer
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader);
}

private static byte Serialize(MqttPubAckPacket packet, MqttPacketWriter writer)
private static byte Serialize(MqttPubAckPacket packet, Stream stream)
{
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("PubAck packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);
stream.Write(packet.PacketIdentifier.Value);

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck);
}

private static byte Serialize(MqttPubRecPacket packet, MqttPacketWriter writer)
private static byte Serialize(MqttPubRecPacket packet, Stream stream)
{
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("PubRec packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);
stream.Write(packet.PacketIdentifier.Value);

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec);
}

private static byte Serialize(MqttPubCompPacket packet, MqttPacketWriter writer)
private static byte Serialize(MqttPubCompPacket packet, Stream stream)
{
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("PubComp packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);
stream.Write(packet.PacketIdentifier.Value);

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp);
}

private static byte Serialize(MqttSubscribePacket packet, MqttPacketWriter writer)
private static byte Serialize(MqttSubscribePacket packet, Stream stream)
{
if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");

@@ -513,41 +504,41 @@ namespace MQTTnet.Serializer
throw new MqttProtocolViolationException("Subscribe packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);
stream.Write(packet.PacketIdentifier.Value);

if (packet.TopicFilters?.Count > 0)
{
foreach (var topicFilter in packet.TopicFilters)
{
writer.WriteWithLengthPrefix(topicFilter.Topic);
writer.Write((byte)topicFilter.QualityOfServiceLevel);
stream.WriteWithLengthPrefix(topicFilter.Topic);
stream.WriteByte((byte)topicFilter.QualityOfServiceLevel);
}
}

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Subscribe, 0x02);
}

private static byte Serialize(MqttSubAckPacket packet, MqttPacketWriter writer)
private static byte Serialize(MqttSubAckPacket packet, Stream stream)
{
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("SubAck packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);
stream.Write(packet.PacketIdentifier.Value);

if (packet.SubscribeReturnCodes?.Any() == true)
{
foreach (var packetSubscribeReturnCode in packet.SubscribeReturnCodes)
{
writer.Write((byte)packetSubscribeReturnCode);
stream.WriteByte((byte)packetSubscribeReturnCode);
}
}

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.SubAck);
}

private static byte Serialize(MqttUnsubscribePacket packet, MqttPacketWriter writer)
private static byte Serialize(MqttUnsubscribePacket packet, Stream stream)
{
if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");

@@ -556,27 +547,27 @@ namespace MQTTnet.Serializer
throw new MqttProtocolViolationException("Unsubscribe packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);
stream.Write(packet.PacketIdentifier.Value);

if (packet.TopicFilters?.Any() == true)
{
foreach (var topicFilter in packet.TopicFilters)
{
writer.WriteWithLengthPrefix(topicFilter);
stream.WriteWithLengthPrefix(topicFilter);
}
}

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
}

private static byte Serialize(MqttUnsubAckPacket packet, BinaryWriter writer)
private static byte Serialize(MqttUnsubAckPacket packet, Stream stream)
{
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("UnsubAck packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);
stream.Write(packet.PacketIdentifier.Value);
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck);
}



+ 11
- 25
Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketWriter.cs View File

@@ -5,13 +5,8 @@ using MQTTnet.Protocol;

namespace MQTTnet.Serializer
{
public sealed class MqttPacketWriter : BinaryWriter
public static class MqttPacketWriter
{
public MqttPacketWriter(Stream stream)
: base(stream, Encoding.UTF8, true)
{
}

public static byte BuildFixedHeader(MqttControlPacketType packetType, byte flags = 0)
{
var fixedHeader = (int)packetType << 4;
@@ -19,40 +14,31 @@ namespace MQTTnet.Serializer
return (byte)fixedHeader;
}

public override void Write(ushort value)
public static void Write(this Stream stream, ushort value)
{
var buffer = BitConverter.GetBytes(value);
Write(buffer[1], buffer[0]);
}

public new void Write(params byte[] values)
{
base.Write(values);
}

public new void Write(byte value)
{
base.Write(value);
stream.WriteByte(buffer[1]);
stream.WriteByte(buffer[0]);
}

public void Write(ByteWriter value)
public static void Write(this Stream stream, ByteWriter value)
{
if (value == null) throw new ArgumentNullException(nameof(value));

Write(value.Value);
stream.WriteByte(value.Value);
}

public void WriteWithLengthPrefix(string value)
public static void WriteWithLengthPrefix(this Stream stream, string value)
{
WriteWithLengthPrefix(Encoding.UTF8.GetBytes(value ?? string.Empty));
stream.WriteWithLengthPrefix(Encoding.UTF8.GetBytes(value ?? string.Empty));
}

public void WriteWithLengthPrefix(byte[] value)
public static void WriteWithLengthPrefix(this Stream stream, byte[] value)
{
var length = (ushort)value.Length;

Write(length);
Write(value);
stream.Write(length);
stream.Write(value, 0, length);
}

public static int EncodeRemainingLength(int length, MemoryStream stream)


Loading…
Cancel
Save