Browse Source

Performance refactoring.

release/3.x.x
Christian Kratky 7 years ago
parent
commit
0b0239bcf1
3 changed files with 80 additions and 92 deletions
  1. +2
    -2
      MQTTnet.Core/Serializer/ByteReader.cs
  2. +45
    -57
      MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs
  3. +33
    -33
      MQTTnet.Core/Serializer/MqttPacketWriter.cs

+ 2
- 2
MQTTnet.Core/Serializer/ByteReader.cs View File

@@ -24,7 +24,7 @@ namespace MQTTnet.Core.Serializer
return result;
}

public byte Read(int count)
public int Read(int count)
{
if (_index + count > 8)
{
@@ -42,7 +42,7 @@ namespace MQTTnet.Core.Serializer
_index++;
}

return (byte)result;
return result;
}
}
}

+ 45
- 57
MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs View File

@@ -16,86 +16,72 @@ namespace MQTTnet.Core.Serializer
if (packet == null) throw new ArgumentNullException(nameof(packet));
if (destination == null) throw new ArgumentNullException(nameof(destination));

var connectPacket = packet as MqttConnectPacket;
if (connectPacket != null)
if (packet is MqttConnectPacket connectPacket)
{
return SerializeAsync(connectPacket, destination);
}

var connAckPacket = packet as MqttConnAckPacket;
if (connAckPacket != null)
if (packet is MqttConnAckPacket connAckPacket)
{
return SerializeAsync(connAckPacket, destination);
}

var disconnectPacket = packet as MqttDisconnectPacket;
if (disconnectPacket != null)
if (packet is MqttDisconnectPacket disconnectPacket)
{
return SerializeAsync(disconnectPacket, destination);
}

var pingReqPacket = packet as MqttPingReqPacket;
if (pingReqPacket != null)
if (packet is MqttPingReqPacket pingReqPacket)
{
return SerializeAsync(pingReqPacket, destination);
}

var pingRespPacket = packet as MqttPingRespPacket;
if (pingRespPacket != null)
if (packet is MqttPingRespPacket pingRespPacket)
{
return SerializeAsync(pingRespPacket, destination);
}

var publishPacket = packet as MqttPublishPacket;
if (publishPacket != null)
if (packet is MqttPublishPacket publishPacket)
{
return SerializeAsync(publishPacket, destination);
}

var pubAckPacket = packet as MqttPubAckPacket;
if (pubAckPacket != null)
if (packet is MqttPubAckPacket pubAckPacket)
{
return SerializeAsync(pubAckPacket, destination);
}

var pubRecPacket = packet as MqttPubRecPacket;
if (pubRecPacket != null)
if (packet is MqttPubRecPacket pubRecPacket)
{
return SerializeAsync(pubRecPacket, destination);
}

var pubRelPacket = packet as MqttPubRelPacket;
if (pubRelPacket != null)
if (packet is MqttPubRelPacket pubRelPacket)
{
return SerializeAsync(pubRelPacket, destination);
}

var pubCompPacket = packet as MqttPubCompPacket;
if (pubCompPacket != null)
if (packet is MqttPubCompPacket pubCompPacket)
{
return SerializeAsync(pubCompPacket, destination);
}

var subscribePacket = packet as MqttSubscribePacket;
if (subscribePacket != null)
if (packet is MqttSubscribePacket subscribePacket)
{
return SerializeAsync(subscribePacket, destination);
}

var subAckPacket = packet as MqttSubAckPacket;
if (subAckPacket != null)
if (packet is MqttSubAckPacket subAckPacket)
{
return SerializeAsync(subAckPacket, destination);
}

var unsubscribePacket = packet as MqttUnsubscribePacket;
if (unsubscribePacket != null)
if (packet is MqttUnsubscribePacket unsubscribePacket)
{
return SerializeAsync(unsubscribePacket, destination);
}

var unsubAckPacket = packet as MqttUnsubAckPacket;
if (unsubAckPacket != null)
if (packet is MqttUnsubAckPacket unsubAckPacket)
{
return SerializeAsync(unsubAckPacket, destination);
}
@@ -206,7 +192,7 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task<MqttBasePacket> DeserializeUnsubscribeAsync(MqttPacketReader reader)
private static async Task<MqttBasePacket> DeserializeUnsubscribeAsync(MqttPacketReader reader)
{
var packet = new MqttUnsubscribePacket
{
@@ -221,7 +207,7 @@ namespace MQTTnet.Core.Serializer
return packet;
}

private async Task<MqttBasePacket> DeserializeSubscribeAsync(MqttPacketReader reader)
private static async Task<MqttBasePacket> DeserializeSubscribeAsync(MqttPacketReader reader)
{
var packet = new MqttSubscribePacket
{
@@ -238,7 +224,7 @@ namespace MQTTnet.Core.Serializer
return packet;
}

private async Task<MqttBasePacket> DeserializePublishAsync(MqttPacketReader reader)
private static async Task<MqttBasePacket> DeserializePublishAsync(MqttPacketReader reader)
{
var fixedHeader = new ByteReader(reader.FixedHeader);
var retain = fixedHeader.Read();
@@ -266,13 +252,10 @@ namespace MQTTnet.Core.Serializer
return packet;
}

private async Task<MqttBasePacket> DeserializeConnectAsync(MqttPacketReader reader)
private static async Task<MqttBasePacket> DeserializeConnectAsync(MqttPacketReader reader)
{
var packet = new MqttConnectPacket();

await reader.ReadRemainingDataByteAsync();
await reader.ReadRemainingDataByteAsync();

await reader.ReadRemainingDataAsync(2); // Skip 2 bytes
var protocolName = await reader.ReadRemainingDataAsync(4);

if (Encoding.UTF8.GetString(protocolName, 0, protocolName.Length) != "MQTT")
@@ -285,7 +268,12 @@ namespace MQTTnet.Core.Serializer

var connectFlagsReader = new ByteReader(connectFlags);
connectFlagsReader.Read(); // Reserved.
packet.CleanSession = connectFlagsReader.Read();

var packet = new MqttConnectPacket
{
CleanSession = connectFlagsReader.Read()
};

var willFlag = connectFlagsReader.Read();
var willQoS = connectFlagsReader.Read(2);
var willRetain = connectFlagsReader.Read();
@@ -318,7 +306,7 @@ namespace MQTTnet.Core.Serializer
return packet;
}

private async Task<MqttBasePacket> DeserializeSubAck(MqttPacketReader reader)
private static async Task<MqttBasePacket> DeserializeSubAck(MqttPacketReader reader)
{
var packet = new MqttSubAckPacket
{
@@ -333,7 +321,7 @@ namespace MQTTnet.Core.Serializer
return packet;
}

private async Task<MqttBasePacket> DeserializeConnAck(MqttPacketReader reader)
private static async Task<MqttBasePacket> DeserializeConnAck(MqttPacketReader reader)
{
var variableHeader1 = await reader.ReadRemainingDataByteAsync();
var variableHeader2 = await reader.ReadRemainingDataByteAsync();
@@ -347,7 +335,7 @@ namespace MQTTnet.Core.Serializer
return packet;
}

private void ValidateConnectPacket(MqttConnectPacket packet)
private static void ValidateConnectPacket(MqttConnectPacket packet)
{
if (string.IsNullOrEmpty(packet.ClientId) && !packet.CleanSession)
{
@@ -355,7 +343,7 @@ namespace MQTTnet.Core.Serializer
}
}

private void ValidatePublishPacket(MqttPublishPacket packet)
private static void ValidatePublishPacket(MqttPublishPacket packet)
{
if (packet.QualityOfServiceLevel == 0 && packet.Dup)
{
@@ -365,7 +353,7 @@ namespace MQTTnet.Core.Serializer

private static readonly byte[] MqttPrefix = Encoding.UTF8.GetBytes("MQTT");

private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
private static Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
{
ValidateConnectPacket(packet);

@@ -420,7 +408,7 @@ namespace MQTTnet.Core.Serializer
}
}

private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
private static Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -435,22 +423,22 @@ namespace MQTTnet.Core.Serializer
}
}

private Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination)
private static Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination)
{
return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination);
}

private Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination)
private static Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination)
{
return SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, destination);
}

private Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination)
private static Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination)
{
return SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, destination);
}

private Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination)
private static Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination)
{
ValidatePublishPacket(packet);

@@ -485,7 +473,7 @@ namespace MQTTnet.Core.Serializer
}
}

private Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination)
private static Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -496,7 +484,7 @@ namespace MQTTnet.Core.Serializer
}
}

private Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination)
private static Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -507,7 +495,7 @@ namespace MQTTnet.Core.Serializer
}
}

private async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination)
private static async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -518,7 +506,7 @@ namespace MQTTnet.Core.Serializer
}
}

private Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination)
private static Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -529,7 +517,7 @@ namespace MQTTnet.Core.Serializer
}
}

private Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination)
private static Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -549,7 +537,7 @@ namespace MQTTnet.Core.Serializer
}
}

private Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination)
private static Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -568,7 +556,7 @@ namespace MQTTnet.Core.Serializer
}
}

private Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination)
private static Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -587,7 +575,7 @@ namespace MQTTnet.Core.Serializer
}
}

private Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination)
private static Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -598,7 +586,7 @@ namespace MQTTnet.Core.Serializer
}
}

private Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination)
private static Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{


+ 33
- 33
MQTTnet.Core/Serializer/MqttPacketWriter.cs View File

@@ -11,39 +11,6 @@ namespace MQTTnet.Core.Serializer
{
private readonly MemoryStream _buffer = new MemoryStream(512);

public void InjectFixedHeader(byte fixedHeader)
{
if (_buffer.Length == 0)
{
Write(fixedHeader);
Write(0);
return;
}

var backupBuffer = _buffer.ToArray();
var remainingLength = (int)_buffer.Length;

_buffer.SetLength(0);

_buffer.WriteByte(fixedHeader);

// Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var x = remainingLength;
do
{
var encodedByte = x % 128;
x = x / 128;
if (x > 0)
{
encodedByte = encodedByte | 128;
}

_buffer.WriteByte((byte)encodedByte);
} while (x > 0);

_buffer.Write(backupBuffer, 0, backupBuffer.Length);
}

public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0)
{
var fixedHeader = (byte)((byte)packetType << 4);
@@ -101,5 +68,38 @@ namespace MQTTnet.Core.Serializer
{
_buffer?.Dispose();
}

private void InjectFixedHeader(byte fixedHeader)
{
if (_buffer.Length == 0)
{
Write(fixedHeader);
Write(0);
return;
}

var backupBuffer = _buffer.ToArray();
var remainingLength = (int)_buffer.Length;

_buffer.SetLength(0);

_buffer.WriteByte(fixedHeader);

// Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var x = remainingLength;
do
{
var encodedByte = x % 128;
x = x / 128;
if (x > 0)
{
encodedByte = encodedByte | 128;
}

_buffer.WriteByte((byte)encodedByte);
} while (x > 0);

_buffer.Write(backupBuffer, 0, backupBuffer.Length);
}
}
}

Loading…
Cancel
Save