Browse Source

Improve payload handling and allocation.

release/3.x.x
Christian Kratky 5 years ago
parent
commit
45c033f401
7 changed files with 57 additions and 53 deletions
  1. +6
    -9
      Source/MQTTnet/Formatter/MqttPacketWriter.cs
  2. +23
    -19
      Source/MQTTnet/Formatter/V3/MqttV310PacketFormatter.cs
  3. +4
    -1
      Source/MQTTnet/Formatter/V5/MqttV500PacketDecoder.cs
  4. +7
    -13
      Source/MQTTnet/Formatter/V5/MqttV500PacketEncoder.cs
  5. +1
    -1
      Source/MQTTnet/MqttApplicationMessage.cs
  6. +15
    -9
      Source/MQTTnet/MqttApplicationMessageBuilder.cs
  7. +1
    -1
      Source/MQTTnet/Server/MqttRetainedMessagesManager.cs

+ 6
- 9
Source/MQTTnet/Formatter/MqttPacketWriter.cs View File

@@ -79,24 +79,21 @@ namespace MQTTnet.Formatter
else
{
WriteWithLengthPrefix(Encoding.UTF8.GetBytes(value));
}
}
}

public void WriteWithLengthPrefix(byte[] value)
{
if (value == null) throw new ArgumentNullException(nameof(value));

EnsureAdditionalCapacity(value.Length + 2);

if (value.Length == 0)
if (value == null || value.Length == 0)
{
Write(ZeroTwoByteIntegerArray);
Write(ZeroTwoByteIntegerArray);
}
else
{
EnsureAdditionalCapacity(value.Length + 2);
Write((ushort)value.Length);
Write(value, 0, value.Length);
}
}
}

public void Write(byte @byte)
@@ -131,7 +128,7 @@ namespace MQTTnet.Formatter
Array.Copy(buffer, offset, _buffer, _offset, count);
IncreasePosition(count);
}
public void Write(MqttPacketWriter propertyWriter)
{
if (propertyWriter == null) throw new ArgumentNullException(nameof(propertyWriter));


+ 23
- 19
Source/MQTTnet/Formatter/V3/MqttV310PacketFormatter.cs View File

@@ -56,17 +56,17 @@ namespace MQTTnet.Formatter.V3
case MqttControlPacketType.Connect: return DecodeConnectPacket(receivedMqttPacket.Body);
case MqttControlPacketType.ConnAck: return DecodeConnAckPacket(receivedMqttPacket.Body);
case MqttControlPacketType.Disconnect: return new MqttDisconnectPacket();
case MqttControlPacketType.Publish: return DecodePublish(receivedMqttPacket);
case MqttControlPacketType.PubAck: return DecodePubAck(receivedMqttPacket.Body);
case MqttControlPacketType.PubRec: return DecodePubRec(receivedMqttPacket.Body);
case MqttControlPacketType.PubRel: return DecodePubRel(receivedMqttPacket.Body);
case MqttControlPacketType.PubComp: return DecodePubComp(receivedMqttPacket.Body);
case MqttControlPacketType.Publish: return DecodePublishPacket(receivedMqttPacket);
case MqttControlPacketType.PubAck: return DecodePubAckPacket(receivedMqttPacket.Body);
case MqttControlPacketType.PubRec: return DecodePubRecPacket(receivedMqttPacket.Body);
case MqttControlPacketType.PubRel: return DecodePubRelPacket(receivedMqttPacket.Body);
case MqttControlPacketType.PubComp: return DecodePubCompPacket(receivedMqttPacket.Body);
case MqttControlPacketType.PingReq: return new MqttPingReqPacket();
case MqttControlPacketType.PingResp: return new MqttPingRespPacket();
case MqttControlPacketType.Subscribe: return DecodeSubscribe(receivedMqttPacket.Body);
case MqttControlPacketType.SubAck: return DecodeSubAck(receivedMqttPacket.Body);
case MqttControlPacketType.Unsubscibe: return DecodeUnsubscribe(receivedMqttPacket.Body);
case MqttControlPacketType.UnsubAck: return DecodeUnsubAck(receivedMqttPacket.Body);
case MqttControlPacketType.Subscribe: return DecodeSubscribePacket(receivedMqttPacket.Body);
case MqttControlPacketType.SubAck: return DecodeSubAckPacket(receivedMqttPacket.Body);
case MqttControlPacketType.Unsubscibe: return DecodeUnsubscribePacket(receivedMqttPacket.Body);
case MqttControlPacketType.UnsubAck: return DecodeUnsubAckPacket(receivedMqttPacket.Body);

default: throw new MqttProtocolViolationException($"Packet type ({controlPacketType}) not supported.");
}
@@ -100,7 +100,7 @@ namespace MQTTnet.Formatter.V3
}
}

private static MqttBasePacket DecodeUnsubAck(MqttPacketBodyReader body)
private static MqttBasePacket DecodeUnsubAckPacket(MqttPacketBodyReader body)
{
ThrowIfBodyIsEmpty(body);

@@ -110,7 +110,7 @@ namespace MQTTnet.Formatter.V3
};
}

private static MqttBasePacket DecodePubComp(MqttPacketBodyReader body)
private static MqttBasePacket DecodePubCompPacket(MqttPacketBodyReader body)
{
ThrowIfBodyIsEmpty(body);

@@ -120,7 +120,7 @@ namespace MQTTnet.Formatter.V3
};
}

private static MqttBasePacket DecodePubRel(MqttPacketBodyReader body)
private static MqttBasePacket DecodePubRelPacket(MqttPacketBodyReader body)
{
ThrowIfBodyIsEmpty(body);

@@ -130,7 +130,7 @@ namespace MQTTnet.Formatter.V3
};
}

private static MqttBasePacket DecodePubRec(MqttPacketBodyReader body)
private static MqttBasePacket DecodePubRecPacket(MqttPacketBodyReader body)
{
ThrowIfBodyIsEmpty(body);

@@ -140,7 +140,7 @@ namespace MQTTnet.Formatter.V3
};
}

private static MqttBasePacket DecodePubAck(MqttPacketBodyReader body)
private static MqttBasePacket DecodePubAckPacket(MqttPacketBodyReader body)
{
ThrowIfBodyIsEmpty(body);

@@ -150,7 +150,7 @@ namespace MQTTnet.Formatter.V3
};
}

private static MqttBasePacket DecodeUnsubscribe(MqttPacketBodyReader body)
private static MqttBasePacket DecodeUnsubscribePacket(MqttPacketBodyReader body)
{
ThrowIfBodyIsEmpty(body);

@@ -167,7 +167,7 @@ namespace MQTTnet.Formatter.V3
return packet;
}

private static MqttBasePacket DecodeSubscribe(MqttPacketBodyReader body)
private static MqttBasePacket DecodeSubscribePacket(MqttPacketBodyReader body)
{
ThrowIfBodyIsEmpty(body);

@@ -190,7 +190,7 @@ namespace MQTTnet.Formatter.V3
return packet;
}

private static MqttBasePacket DecodePublish(ReceivedMqttPacket receivedMqttPacket)
private static MqttBasePacket DecodePublishPacket(ReceivedMqttPacket receivedMqttPacket)
{
ThrowIfBodyIsEmpty(receivedMqttPacket.Body);

@@ -211,11 +211,15 @@ namespace MQTTnet.Formatter.V3
PacketIdentifier = packetIdentifier,
Retain = retain,
Topic = topic,
Payload = receivedMqttPacket.Body.ReadRemainingData().ToArray(),
QualityOfServiceLevel = qualityOfServiceLevel,
Dup = dup
};

if (!receivedMqttPacket.Body.EndOfStream)
{
packet.Payload = receivedMqttPacket.Body.ReadRemainingData().ToArray();
}

return packet;
}

@@ -271,7 +275,7 @@ namespace MQTTnet.Formatter.V3
return packet;
}

private static MqttBasePacket DecodeSubAck(MqttPacketBodyReader body)
private static MqttBasePacket DecodeSubAckPacket(MqttPacketBodyReader body)
{
ThrowIfBodyIsEmpty(body);



+ 4
- 1
Source/MQTTnet/Formatter/V5/MqttV500PacketDecoder.cs View File

@@ -503,7 +503,10 @@ namespace MQTTnet.Formatter.V5
}
}

packet.Payload = body.ReadRemainingData().ToArray();
if (!body.EndOfStream)
{
packet.Payload = body.ReadRemainingData().ToArray();
}

return packet;
}


+ 7
- 13
Source/MQTTnet/Formatter/V5/MqttV500PacketEncoder.cs View File

@@ -380,7 +380,7 @@ namespace MQTTnet.Formatter.V5

private static byte EncodeSubscribePacket(MqttSubscribePacket packet, MqttPacketWriter packetWriter)
{
if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");
if (packet.TopicFilters?.Any() != true) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");

ThrowIfPacketIdentifierIsInvalid(packet);

@@ -427,12 +427,9 @@ namespace MQTTnet.Formatter.V5

private static byte EncodeSubAckPacket(MqttSubAckPacket packet, MqttPacketWriter packetWriter)
{
ThrowIfPacketIdentifierIsInvalid(packet);
if (packet.ReasonCodes?.Any() != true) throw new MqttProtocolViolationException("At least one reason code must be set[MQTT - 3.8.3 - 3].");

if (packet.ReasonCodes == null || !packet.ReasonCodes.Any())
{
throw new MqttProtocolViolationException("At least one reason code must be set[MQTT - 3.8.3 - 3].");
}
ThrowIfPacketIdentifierIsInvalid(packet);

packetWriter.Write(packet.PacketIdentifier.Value);

@@ -455,7 +452,7 @@ namespace MQTTnet.Formatter.V5

private static byte EncodeUnsubscribePacket(MqttUnsubscribePacket packet, MqttPacketWriter packetWriter)
{
if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");
if (packet.TopicFilters?.Any() != true) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");

ThrowIfPacketIdentifierIsInvalid(packet);

@@ -479,13 +476,10 @@ namespace MQTTnet.Formatter.V5

private static byte EncodeUnsubAckPacket(MqttUnsubAckPacket packet, MqttPacketWriter packetWriter)
{
ThrowIfPacketIdentifierIsInvalid(packet);

if (packet.ReasonCodes == null || !packet.ReasonCodes.Any())
{
throw new MqttProtocolViolationException("At least one reason code must be set[MQTT - 3.8.3 - 3].");
}
if (packet.ReasonCodes?.Any() != true) throw new MqttProtocolViolationException("At least one reason code must be set[MQTT - 3.8.3 - 3].");

ThrowIfPacketIdentifierIsInvalid(packet);
packetWriter.Write(packet.PacketIdentifier.Value);

var propertiesWriter = new MqttV500PropertiesWriter();


+ 1
- 1
Source/MQTTnet/MqttApplicationMessage.cs View File

@@ -14,7 +14,7 @@ namespace MQTTnet

public bool Retain { get; set; }

public List<MqttUserProperty> UserProperties { get; set; } = new List<MqttUserProperty>();
public List<MqttUserProperty> UserProperties { get; set; }

public string ContentType { get; set; }



+ 15
- 9
Source/MQTTnet/MqttApplicationMessageBuilder.cs View File

@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
@@ -11,8 +10,6 @@ namespace MQTTnet
{
public class MqttApplicationMessageBuilder
{
private readonly List<MqttUserProperty> _userProperties = new List<MqttUserProperty>();

private MqttQualityOfServiceLevel _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
private string _topic;
private byte[] _payload;
@@ -24,6 +21,7 @@ namespace MQTTnet
private uint? _subscriptionIdentifier;
private uint? _messageExpiryInterval;
private MqttPayloadFormatIndicator? _payloadFormatIndicator;
private List<MqttUserProperty> _userProperties;

public MqttApplicationMessageBuilder WithTopic(string topic)
{
@@ -70,7 +68,7 @@ namespace MQTTnet

if (payload.Length == 0)
{
_payload = new byte[0];
_payload = null;
}
else
{
@@ -89,7 +87,7 @@ namespace MQTTnet
return this;
}

_payload = string.IsNullOrEmpty(payload) ? new byte[0] : Encoding.UTF8.GetBytes(payload);
_payload = string.IsNullOrEmpty(payload) ? null : Encoding.UTF8.GetBytes(payload);
return this;
}

@@ -128,6 +126,11 @@ namespace MQTTnet
/// </summary>
public MqttApplicationMessageBuilder WithUserProperty(string name, string value)
{
if (_userProperties == null)
{
_userProperties = new List<MqttUserProperty>();
}

_userProperties.Add(new MqttUserProperty(name, value));
return this;
}
@@ -205,7 +208,7 @@ namespace MQTTnet
var applicationMessage = new MqttApplicationMessage
{
Topic = _topic,
Payload = _payload ?? new byte[0],
Payload = _payload,
QualityOfServiceLevel = _qualityOfServiceLevel,
Retain = _retain,
ContentType = _contentType,
@@ -217,8 +220,11 @@ namespace MQTTnet
PayloadFormatIndicator = _payloadFormatIndicator
};

applicationMessage.UserProperties.AddRange(_userProperties);

if (_userProperties?.Any() == true)
{
applicationMessage.UserProperties = _userProperties;
}
return applicationMessage;
}
}


+ 1
- 1
Source/MQTTnet/Server/MqttRetainedMessagesManager.cs View File

@@ -62,7 +62,7 @@ namespace MQTTnet.Server
{
var saveIsRequired = false;

if (applicationMessage.Payload?.Length == 0)
if (applicationMessage.Payload?.Any() != true)
{
saveIsRequired = _messages.Remove(applicationMessage.Topic);
_logger.Verbose("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic);


Loading…
Cancel
Save