@@ -11,12 +11,7 @@ | |||
<requireLicenseAcceptance>false</requireLicenseAcceptance> | |||
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> | |||
<releaseNotes> | |||
* [Core] Fixed missing properties from PUBLISH packet in _MqttApplicationMessage_ (thanks to @pcbing). | |||
* [Core] Fixed wrong encoding of PUBREL and PUBCOMP packets for MQTTv5 (thanks to @perphilipp). | |||
* [Client] Added the authentication result to the disconnected handler (only set when connecting failed). | |||
* [Client] Added new overloads for _MqttClientOptionsBuilder_. | |||
* [Server] Fixed a bug which returns wrong flag for existing session in CONNACK packet (thanks to @avengerstark). | |||
* [nuget] .NET Framework builds are now using 4.5.2 or 4.6.1 builds instead of netstandard 2.0. | |||
* [Core] Fixed issues in MQTTv5 message encoding and decoding. | |||
</releaseNotes> | |||
<copyright>Copyright Christian Kratky 2016-2019</copyright> | |||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | |||
@@ -469,6 +469,9 @@ namespace MQTTnet.Client | |||
} | |||
else if (packet is MqttDisconnectPacket) | |||
{ | |||
// Also dispatch disconnect to waiting threads to generate a proper exception. | |||
_packetDispatcher.Dispatch(packet); | |||
await DisconnectAsync(null, cancellationToken).ConfigureAwait(false); | |||
} | |||
else | |||
@@ -7,6 +7,8 @@ namespace MQTTnet.Client.Subscribing | |||
{ | |||
public List<TopicFilter> TopicFilters { get; set; } = new List<TopicFilter>(); | |||
public uint? SubscriptionIdentifier { get; set; } | |||
public List<MqttUserProperty> UserProperties { get; set; } = new List<MqttUserProperty>(); | |||
} | |||
} |
@@ -0,0 +1,29 @@ | |||
using System.Collections.Generic; | |||
using MQTTnet.Packets; | |||
using MQTTnet.Protocol; | |||
namespace MQTTnet.Exceptions | |||
{ | |||
public class MqttUnexpectedDisconnectReceivedException : MqttCommunicationException | |||
{ | |||
public MqttUnexpectedDisconnectReceivedException(MqttDisconnectPacket disconnectPacket) | |||
: base($"Unexpected DISCONNECT (Reason code={disconnectPacket.ReasonCode}) received.") | |||
{ | |||
ReasonCode = disconnectPacket.ReasonCode; | |||
SessionExpiryInterval = disconnectPacket.Properties?.SessionExpiryInterval; | |||
ReasonString = disconnectPacket.Properties?.ReasonString; | |||
ServerReference = disconnectPacket.Properties?.ServerReference; | |||
UserProperties = disconnectPacket.Properties?.UserProperties; | |||
} | |||
public MqttDisconnectReasonCode? ReasonCode { get; } | |||
public uint? SessionExpiryInterval { get; } | |||
public string ReasonString { get; } | |||
public List<MqttUserProperty> UserProperties { get; } | |||
public string ServerReference { get; } | |||
} | |||
} |
@@ -8,7 +8,6 @@ | |||
void Write(byte returnCode); | |||
void WriteWithLengthPrefix(byte[] payload); | |||
void Write(ushort keepAlivePeriod); | |||
void Write(IMqttPacketWriter propertyWriter); | |||
void WriteVariableLengthInteger(uint length); | |||
void Write(byte[] payload, int v, int length); | |||
@@ -33,13 +33,14 @@ namespace MQTTnet.Formatter.V5 | |||
MessageExpiryInterval = applicationMessage.MessageExpiryInterval, | |||
PayloadFormatIndicator = applicationMessage.PayloadFormatIndicator, | |||
ResponseTopic = applicationMessage.ResponseTopic, | |||
SubscriptionIdentifier = applicationMessage.SubscriptionIdentifier, | |||
SubscriptionIdentifiers = applicationMessage.SubscriptionIdentifiers, | |||
TopicAlias = applicationMessage.TopicAlias | |||
} | |||
}; | |||
if (applicationMessage.UserProperties != null) | |||
{ | |||
packet.Properties.UserProperties = new List<MqttUserProperty>(); | |||
packet.Properties.UserProperties.AddRange(applicationMessage.UserProperties); | |||
} | |||
@@ -67,7 +68,7 @@ namespace MQTTnet.Formatter.V5 | |||
ContentType = publishPacket.Properties?.ContentType, | |||
CorrelationData = publishPacket.Properties?.CorrelationData, | |||
MessageExpiryInterval = publishPacket.Properties?.MessageExpiryInterval, | |||
SubscriptionIdentifier = publishPacket.Properties?.SubscriptionIdentifier, | |||
SubscriptionIdentifiers = publishPacket.Properties?.SubscriptionIdentifiers, | |||
TopicAlias = publishPacket.Properties?.TopicAlias, | |||
PayloadFormatIndicator = publishPacket.Properties?.PayloadFormatIndicator, | |||
UserProperties = publishPacket.Properties?.UserProperties ?? new List<MqttUserProperty>() | |||
@@ -158,6 +159,7 @@ namespace MQTTnet.Formatter.V5 | |||
}; | |||
packet.TopicFilters.AddRange(options.TopicFilters); | |||
packet.Properties.SubscriptionIdentifier = options.SubscriptionIdentifier; | |||
packet.Properties.UserProperties.AddRange(options.UserProperties); | |||
return packet; | |||
@@ -1,4 +1,5 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using MQTTnet.Adapter; | |||
using MQTTnet.Exceptions; | |||
@@ -91,10 +92,6 @@ namespace MQTTnet.Formatter.V5 | |||
{ | |||
packet.Properties.AuthenticationMethod = propertiesReader.ReadAuthenticationMethod(); | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.WillDelayInterval) | |||
{ | |||
packet.Properties.WillDelayInterval = propertiesReader.ReadWillDelayInterval(); | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AuthenticationData) | |||
{ | |||
packet.Properties.AuthenticationData = propertiesReader.ReadAuthenticationData(); | |||
@@ -121,11 +118,11 @@ namespace MQTTnet.Formatter.V5 | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
propertiesReader.FillUserProperties(packet.Properties.UserProperties); | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttConnAckPacket)); | |||
propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttConnectPacket)); | |||
} | |||
} | |||
@@ -133,6 +130,63 @@ namespace MQTTnet.Formatter.V5 | |||
if (packet.WillMessage != null) | |||
{ | |||
var willPropertiesReader = new MqttV500PropertiesReader(body); | |||
while (willPropertiesReader.MoveNext()) | |||
{ | |||
if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.PayloadFormatIndicator) | |||
{ | |||
packet.WillMessage.PayloadFormatIndicator = propertiesReader.ReadPayloadFormatIndicator(); | |||
} | |||
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.MessageExpiryInterval) | |||
{ | |||
packet.WillMessage.MessageExpiryInterval = propertiesReader.ReadMessageExpiryInterval(); | |||
} | |||
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.TopicAlias) | |||
{ | |||
packet.WillMessage.TopicAlias = propertiesReader.ReadTopicAlias(); | |||
} | |||
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.ResponseTopic) | |||
{ | |||
packet.WillMessage.ResponseTopic = propertiesReader.ReadResponseTopic(); | |||
} | |||
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.CorrelationData) | |||
{ | |||
packet.WillMessage.CorrelationData = propertiesReader.ReadCorrelationData(); | |||
} | |||
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifier) | |||
{ | |||
if (packet.WillMessage.SubscriptionIdentifiers == null) | |||
{ | |||
packet.WillMessage.SubscriptionIdentifiers = new List<uint>(); | |||
} | |||
packet.WillMessage.SubscriptionIdentifiers.Add(propertiesReader.ReadSubscriptionIdentifier()); | |||
} | |||
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.ContentType) | |||
{ | |||
packet.WillMessage.ContentType = propertiesReader.ReadContentType(); | |||
} | |||
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.WillDelayInterval) | |||
{ | |||
// This is a special case! | |||
packet.Properties.WillDelayInterval = propertiesReader.ReadWillDelayInterval(); | |||
} | |||
else if (willPropertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
if (packet.WillMessage.UserProperties == null) | |||
{ | |||
packet.WillMessage.UserProperties = new List<MqttUserProperty>(); | |||
} | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
propertiesReader.ThrowInvalidPropertyIdException(typeof(MqttPublishPacket)); | |||
} | |||
} | |||
packet.WillMessage.Topic = body.ReadStringWithLengthPrefix(); | |||
packet.WillMessage.Payload = body.ReadWithLengthPrefix(); | |||
} | |||
@@ -186,7 +240,7 @@ namespace MQTTnet.Formatter.V5 | |||
{ | |||
packet.Properties.ReceiveMaximum = propertiesReader.ReadReceiveMaximum(); | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AssignedClientIdentifer) | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.AssignedClientIdentifier) | |||
{ | |||
packet.Properties.AssignedClientIdentifier = propertiesReader.ReadAssignedClientIdentifier(); | |||
} | |||
@@ -228,7 +282,7 @@ namespace MQTTnet.Formatter.V5 | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
propertiesReader.FillUserProperties(packet.Properties.UserProperties); | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
@@ -266,7 +320,7 @@ namespace MQTTnet.Formatter.V5 | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
propertiesReader.FillUserProperties(packet.Properties.UserProperties); | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
@@ -296,7 +350,7 @@ namespace MQTTnet.Formatter.V5 | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
propertiesReader.FillUserProperties(packet.Properties.UserProperties); | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
@@ -346,7 +400,7 @@ namespace MQTTnet.Formatter.V5 | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
propertiesReader.FillUserProperties(packet.Properties.UserProperties); | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
@@ -378,7 +432,7 @@ namespace MQTTnet.Formatter.V5 | |||
{ | |||
if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
propertiesReader.FillUserProperties(packet.Properties.UserProperties); | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
@@ -413,7 +467,7 @@ namespace MQTTnet.Formatter.V5 | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
propertiesReader.FillUserProperties(packet.Properties.UserProperties); | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
@@ -487,7 +541,12 @@ namespace MQTTnet.Formatter.V5 | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.SubscriptionIdentifier) | |||
{ | |||
packet.Properties.SubscriptionIdentifier = propertiesReader.ReadSubscriptionIdentifier(); | |||
if (packet.Properties.SubscriptionIdentifiers == null) | |||
{ | |||
packet.Properties.SubscriptionIdentifiers = new List<uint>(); | |||
} | |||
packet.Properties.SubscriptionIdentifiers.Add(propertiesReader.ReadSubscriptionIdentifier()); | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.ContentType) | |||
{ | |||
@@ -495,7 +554,12 @@ namespace MQTTnet.Formatter.V5 | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
propertiesReader.FillUserProperties(packet.Properties.UserProperties); | |||
if (packet.Properties.UserProperties == null) | |||
{ | |||
packet.Properties.UserProperties = new List<MqttUserProperty>(); | |||
} | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
@@ -538,7 +602,7 @@ namespace MQTTnet.Formatter.V5 | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
propertiesReader.FillUserProperties(packet.Properties.UserProperties); | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
@@ -576,7 +640,7 @@ namespace MQTTnet.Formatter.V5 | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
propertiesReader.FillUserProperties(packet.Properties.UserProperties); | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
@@ -614,7 +678,7 @@ namespace MQTTnet.Formatter.V5 | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
propertiesReader.FillUserProperties(packet.Properties.UserProperties); | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
@@ -652,7 +716,7 @@ namespace MQTTnet.Formatter.V5 | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
propertiesReader.FillUserProperties(packet.Properties.UserProperties); | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
@@ -697,7 +761,7 @@ namespace MQTTnet.Formatter.V5 | |||
} | |||
else if (propertiesReader.CurrentPropertyId == MqttPropertyId.UserProperty) | |||
{ | |||
propertiesReader.FillUserProperties(packet.Properties.UserProperties); | |||
propertiesReader.AddUserPropertyTo(packet.Properties.UserProperties); | |||
} | |||
else | |||
{ | |||
@@ -127,24 +127,38 @@ namespace MQTTnet.Formatter.V5 | |||
var propertiesWriter = new MqttV500PropertiesWriter(); | |||
if (packet.Properties != null) | |||
{ | |||
propertiesWriter.WriteWillDelayInterval(packet.Properties.WillDelayInterval); | |||
propertiesWriter.WriteSessionExpiryInterval(packet.Properties.SessionExpiryInterval); | |||
propertiesWriter.WriteAuthenticationMethod(packet.Properties.AuthenticationMethod); | |||
propertiesWriter.WriteAuthenticationData(packet.Properties.AuthenticationData); | |||
propertiesWriter.WriteRequestProblemInformation(packet.Properties.RequestProblemInformation); | |||
propertiesWriter.WriteRequestResponseInformation(packet.Properties.RequestResponseInformation); | |||
propertiesWriter.WriteReceiveMaximum(packet.Properties.ReceiveMaximum); | |||
propertiesWriter.WriteTopicAlias(packet.Properties.TopicAliasMaximum); | |||
propertiesWriter.WriteTopicAliasMaximum(packet.Properties.TopicAliasMaximum); | |||
propertiesWriter.WriteMaximumPacketSize(packet.Properties.MaximumPacketSize); | |||
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties); | |||
} | |||
propertiesWriter.WriteToPacket(packetWriter); | |||
propertiesWriter.WriteTo(packetWriter); | |||
packetWriter.WriteWithLengthPrefix(packet.ClientId); | |||
if (packet.WillMessage != null) | |||
{ | |||
var willPropertiesWriter = new MqttV500PropertiesWriter(); | |||
willPropertiesWriter.WritePayloadFormatIndicator(packet.WillMessage.PayloadFormatIndicator); | |||
willPropertiesWriter.WriteMessageExpiryInterval(packet.WillMessage.MessageExpiryInterval); | |||
willPropertiesWriter.WriteTopicAlias(packet.WillMessage.TopicAlias); | |||
willPropertiesWriter.WriteResponseTopic(packet.WillMessage.ResponseTopic); | |||
willPropertiesWriter.WriteCorrelationData(packet.WillMessage.CorrelationData); | |||
willPropertiesWriter.WriteSubscriptionIdentifiers(packet.WillMessage.SubscriptionIdentifiers); | |||
willPropertiesWriter.WriteContentType(packet.WillMessage.ContentType); | |||
willPropertiesWriter.WriteUserProperties(packet.WillMessage.UserProperties); | |||
// This is a special case! | |||
willPropertiesWriter.WriteWillDelayInterval(packet.Properties?.WillDelayInterval); | |||
willPropertiesWriter.WriteTo(packetWriter); | |||
packetWriter.WriteWithLengthPrefix(packet.WillMessage.Topic); | |||
packetWriter.WriteWithLengthPrefix(packet.WillMessage.Payload); | |||
} | |||
@@ -202,7 +216,7 @@ namespace MQTTnet.Formatter.V5 | |||
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties); | |||
} | |||
propertiesWriter.WriteToPacket(packetWriter); | |||
propertiesWriter.WriteTo(packetWriter); | |||
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.ConnAck); | |||
} | |||
@@ -244,12 +258,12 @@ namespace MQTTnet.Formatter.V5 | |||
propertiesWriter.WriteTopicAlias(packet.Properties.TopicAlias); | |||
propertiesWriter.WriteResponseTopic(packet.Properties.ResponseTopic); | |||
propertiesWriter.WriteCorrelationData(packet.Properties.CorrelationData); | |||
propertiesWriter.WriteSubscriptionIdentifier(packet.Properties.SubscriptionIdentifier); | |||
propertiesWriter.WriteSubscriptionIdentifiers(packet.Properties.SubscriptionIdentifiers); | |||
propertiesWriter.WriteContentType(packet.Properties.ContentType); | |||
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties); | |||
} | |||
propertiesWriter.WriteToPacket(packetWriter); | |||
propertiesWriter.WriteTo(packetWriter); | |||
if (packet.Payload?.Length > 0) | |||
{ | |||
@@ -300,7 +314,7 @@ namespace MQTTnet.Formatter.V5 | |||
if (packetWriter.Length > 0 || packet.ReasonCode.Value != MqttPubAckReasonCode.Success) | |||
{ | |||
packetWriter.Write((byte)packet.ReasonCode.Value); | |||
propertiesWriter.WriteToPacket(packetWriter); | |||
propertiesWriter.WriteTo(packetWriter); | |||
} | |||
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck); | |||
@@ -328,7 +342,7 @@ namespace MQTTnet.Formatter.V5 | |||
if (packetWriter.Length > 0 || packet.ReasonCode.Value != MqttPubRecReasonCode.Success) | |||
{ | |||
packetWriter.Write((byte)packet.ReasonCode.Value); | |||
propertiesWriter.WriteToPacket(packetWriter); | |||
propertiesWriter.WriteTo(packetWriter); | |||
} | |||
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec); | |||
@@ -355,7 +369,7 @@ namespace MQTTnet.Formatter.V5 | |||
if (propertiesWriter.Length > 0 || packet.ReasonCode.Value != MqttPubRelReasonCode.Success) | |||
{ | |||
packetWriter.Write((byte)packet.ReasonCode.Value); | |||
propertiesWriter.WriteToPacket(packetWriter); | |||
propertiesWriter.WriteTo(packetWriter); | |||
} | |||
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02); | |||
@@ -382,7 +396,7 @@ namespace MQTTnet.Formatter.V5 | |||
if (propertiesWriter.Length > 0 || packet.ReasonCode.Value != MqttPubCompReasonCode.Success) | |||
{ | |||
packetWriter.Write((byte)packet.ReasonCode.Value); | |||
propertiesWriter.WriteToPacket(packetWriter); | |||
propertiesWriter.WriteTo(packetWriter); | |||
} | |||
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp); | |||
@@ -403,7 +417,7 @@ namespace MQTTnet.Formatter.V5 | |||
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties); | |||
} | |||
propertiesWriter.WriteToPacket(packetWriter); | |||
propertiesWriter.WriteTo(packetWriter); | |||
if (packet.TopicFilters?.Count > 0) | |||
{ | |||
@@ -450,7 +464,7 @@ namespace MQTTnet.Formatter.V5 | |||
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties); | |||
} | |||
propertiesWriter.WriteToPacket(packetWriter); | |||
propertiesWriter.WriteTo(packetWriter); | |||
foreach (var reasonCode in packet.ReasonCodes) | |||
{ | |||
@@ -474,7 +488,7 @@ namespace MQTTnet.Formatter.V5 | |||
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties); | |||
} | |||
propertiesWriter.WriteToPacket(packetWriter); | |||
propertiesWriter.WriteTo(packetWriter); | |||
foreach (var topicFilter in packet.TopicFilters) | |||
{ | |||
@@ -499,7 +513,7 @@ namespace MQTTnet.Formatter.V5 | |||
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties); | |||
} | |||
propertiesWriter.WriteToPacket(packetWriter); | |||
propertiesWriter.WriteTo(packetWriter); | |||
foreach (var reasonCode in packet.ReasonCodes) | |||
{ | |||
@@ -527,7 +541,7 @@ namespace MQTTnet.Formatter.V5 | |||
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties); | |||
} | |||
propertiesWriter.WriteToPacket(packetWriter); | |||
propertiesWriter.WriteTo(packetWriter); | |||
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Disconnect); | |||
} | |||
@@ -555,7 +569,7 @@ namespace MQTTnet.Formatter.V5 | |||
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties); | |||
} | |||
propertiesWriter.WriteToPacket(packetWriter); | |||
propertiesWriter.WriteTo(packetWriter); | |||
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Auth); | |||
} | |||
@@ -1,6 +1,5 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using MQTTnet.Exceptions; | |||
using MQTTnet.Packets; | |||
using MQTTnet.Protocol; | |||
@@ -43,24 +42,14 @@ namespace MQTTnet.Formatter.V5 | |||
return true; | |||
} | |||
public void FillUserProperties(List<MqttUserProperty> userProperties) | |||
public void AddUserPropertyTo(List<MqttUserProperty> userProperties) | |||
{ | |||
if (userProperties == null) throw new ArgumentNullException(nameof(userProperties)); | |||
var userPropertiesLength = _body.ReadVariableLengthInteger(); | |||
if (userPropertiesLength == 0) | |||
{ | |||
return; | |||
} | |||
var name = _body.ReadStringWithLengthPrefix(); | |||
var value = _body.ReadStringWithLengthPrefix(); | |||
var targetPosition = _body.Offset + userPropertiesLength; | |||
while (_body.Offset < targetPosition) | |||
{ | |||
var name = _body.ReadStringWithLengthPrefix(); | |||
var value = _body.ReadStringWithLengthPrefix(); | |||
userProperties.Add(new MqttUserProperty(name, value)); | |||
} | |||
userProperties.Add(new MqttUserProperty(name, value)); | |||
} | |||
public string ReadReasonString() | |||
@@ -78,17 +67,17 @@ namespace MQTTnet.Formatter.V5 | |||
return _body.ReadWithLengthPrefix(); | |||
} | |||
public bool? ReadRetainAvailable() | |||
public bool ReadRetainAvailable() | |||
{ | |||
return _body.ReadBoolean(); | |||
} | |||
public uint? ReadSessionExpiryInterval() | |||
public uint ReadSessionExpiryInterval() | |||
{ | |||
return _body.ReadFourByteInteger(); | |||
} | |||
public ushort? ReadReceiveMaximum() | |||
public ushort ReadReceiveMaximum() | |||
{ | |||
return _body.ReadTwoByteInteger(); | |||
} | |||
@@ -103,17 +92,17 @@ namespace MQTTnet.Formatter.V5 | |||
return _body.ReadStringWithLengthPrefix(); | |||
} | |||
public ushort? ReadTopicAliasMaximum() | |||
public ushort ReadTopicAliasMaximum() | |||
{ | |||
return _body.ReadTwoByteInteger(); | |||
} | |||
public uint? ReadMaximumPacketSize() | |||
public uint ReadMaximumPacketSize() | |||
{ | |||
return _body.ReadFourByteInteger(); | |||
} | |||
public ushort? ReadServerKeepAlive() | |||
public ushort ReadServerKeepAlive() | |||
{ | |||
return _body.ReadTwoByteInteger(); | |||
} | |||
@@ -123,22 +112,22 @@ namespace MQTTnet.Formatter.V5 | |||
return _body.ReadStringWithLengthPrefix(); | |||
} | |||
public bool? ReadSharedSubscriptionAvailable() | |||
public bool ReadSharedSubscriptionAvailable() | |||
{ | |||
return _body.ReadBoolean(); | |||
} | |||
public bool? ReadSubscriptionIdentifiersAvailable() | |||
public bool ReadSubscriptionIdentifiersAvailable() | |||
{ | |||
return _body.ReadBoolean(); | |||
} | |||
public bool? ReadWildcardSubscriptionAvailable() | |||
public bool ReadWildcardSubscriptionAvailable() | |||
{ | |||
return _body.ReadBoolean(); | |||
} | |||
public uint? ReadSubscriptionIdentifier() | |||
public uint ReadSubscriptionIdentifier() | |||
{ | |||
return _body.ReadVariableLengthInteger(); | |||
} | |||
@@ -148,12 +137,12 @@ namespace MQTTnet.Formatter.V5 | |||
return (MqttPayloadFormatIndicator)_body.ReadByte(); | |||
} | |||
public uint? ReadMessageExpiryInterval() | |||
public uint ReadMessageExpiryInterval() | |||
{ | |||
return _body.ReadFourByteInteger(); | |||
} | |||
public ushort? ReadTopicAlias() | |||
public ushort ReadTopicAlias() | |||
{ | |||
return _body.ReadTwoByteInteger(); | |||
} | |||
@@ -173,17 +162,17 @@ namespace MQTTnet.Formatter.V5 | |||
return _body.ReadStringWithLengthPrefix(); | |||
} | |||
public uint? ReadWillDelayInterval() | |||
public uint ReadWillDelayInterval() | |||
{ | |||
return _body.ReadFourByteInteger(); | |||
} | |||
public bool? RequestResponseInformation() | |||
public bool RequestResponseInformation() | |||
{ | |||
return _body.ReadBoolean(); | |||
} | |||
public bool? RequestProblemInformation() | |||
public bool RequestProblemInformation() | |||
{ | |||
return _body.ReadBoolean(); | |||
} | |||
@@ -19,16 +19,12 @@ namespace MQTTnet.Formatter.V5 | |||
return; | |||
} | |||
var propertyWriter = new MqttPacketWriter(); | |||
foreach (var property in userProperties) | |||
{ | |||
propertyWriter.WriteWithLengthPrefix(property.Name); | |||
propertyWriter.WriteWithLengthPrefix(property.Value); | |||
_packetWriter.Write((byte)MqttPropertyId.UserProperty); | |||
_packetWriter.WriteWithLengthPrefix(property.Name); | |||
_packetWriter.WriteWithLengthPrefix(property.Value); | |||
} | |||
_packetWriter.Write((byte)MqttPropertyId.UserProperty); | |||
_packetWriter.WriteVariableLengthInteger((uint)propertyWriter.Length); | |||
_packetWriter.Write(propertyWriter); | |||
} | |||
public void WriteCorrelationData(byte[] value) | |||
@@ -66,7 +62,7 @@ namespace MQTTnet.Formatter.V5 | |||
Write(MqttPropertyId.AuthenticationMethod, value); | |||
} | |||
public void WriteToPacket(IMqttPacketWriter packetWriter) | |||
public void WriteTo(IMqttPacketWriter packetWriter) | |||
{ | |||
if (packetWriter == null) throw new ArgumentNullException(nameof(packetWriter)); | |||
@@ -84,6 +80,19 @@ namespace MQTTnet.Formatter.V5 | |||
WriteAsVariableLengthInteger(MqttPropertyId.SubscriptionIdentifier, value); | |||
} | |||
public void WriteSubscriptionIdentifiers(IEnumerable<uint> value) | |||
{ | |||
if (value == null) | |||
{ | |||
return; | |||
} | |||
foreach (var subscriptionIdentifier in value) | |||
{ | |||
WriteAsVariableLengthInteger(MqttPropertyId.SubscriptionIdentifier, subscriptionIdentifier); | |||
} | |||
} | |||
public void WriteTopicAlias(ushort? value) | |||
{ | |||
Write(MqttPropertyId.TopicAlias, value); | |||
@@ -121,7 +130,7 @@ namespace MQTTnet.Formatter.V5 | |||
public void WriteReceiveMaximum(ushort? value) | |||
{ | |||
Write(MqttPropertyId.RequestResponseInformation, value); | |||
Write(MqttPropertyId.ReceiveMaximum, value); | |||
} | |||
public void WriteMaximumPacketSize(uint? value) | |||
@@ -136,7 +145,7 @@ namespace MQTTnet.Formatter.V5 | |||
public void WriteAssignedClientIdentifier(string value) | |||
{ | |||
Write(MqttPropertyId.AssignedClientIdentifer, value); | |||
Write(MqttPropertyId.AssignedClientIdentifier, value); | |||
} | |||
public void WriteTopicAliasMaximum(ushort? value) | |||
@@ -180,6 +189,17 @@ namespace MQTTnet.Formatter.V5 | |||
_packetWriter.Write(value.Value ? (byte)0x1 : (byte)0x0); | |||
} | |||
private void Write(MqttPropertyId id, byte? value) | |||
{ | |||
if (!value.HasValue) | |||
{ | |||
return; | |||
} | |||
_packetWriter.Write((byte)id); | |||
_packetWriter.Write(value.Value); | |||
} | |||
private void Write(MqttPropertyId id, ushort? value) | |||
{ | |||
if (!value.HasValue) | |||
@@ -28,6 +28,6 @@ namespace MQTTnet | |||
public byte[] CorrelationData { get; set; } | |||
public uint? SubscriptionIdentifier { get; set; } | |||
public List<uint> SubscriptionIdentifiers { get; set; } | |||
} | |||
} |
@@ -18,7 +18,7 @@ namespace MQTTnet | |||
private string _responseTopic; | |||
private byte[] _correlationData; | |||
private ushort? _topicAlias; | |||
private uint? _subscriptionIdentifier; | |||
private List<uint> _subscriptionIdentifiers; | |||
private uint? _messageExpiryInterval; | |||
private MqttPayloadFormatIndicator? _payloadFormatIndicator; | |||
private List<MqttUserProperty> _userProperties; | |||
@@ -43,7 +43,7 @@ namespace MQTTnet | |||
{ | |||
_payload = payload.ToArray(); | |||
} | |||
return this; | |||
} | |||
@@ -176,7 +176,12 @@ namespace MQTTnet | |||
/// </summary> | |||
public MqttApplicationMessageBuilder WithSubscriptionIdentifier(uint subscriptionIdentifier) | |||
{ | |||
_subscriptionIdentifier = subscriptionIdentifier; | |||
if (_subscriptionIdentifiers == null) | |||
{ | |||
_subscriptionIdentifiers = new List<uint>(); | |||
} | |||
_subscriptionIdentifiers.Add(subscriptionIdentifier); | |||
return this; | |||
} | |||
@@ -215,16 +220,12 @@ namespace MQTTnet | |||
ResponseTopic = _responseTopic, | |||
CorrelationData = _correlationData, | |||
TopicAlias = _topicAlias, | |||
SubscriptionIdentifier = _subscriptionIdentifier, | |||
SubscriptionIdentifiers = _subscriptionIdentifiers, | |||
MessageExpiryInterval = _messageExpiryInterval, | |||
PayloadFormatIndicator = _payloadFormatIndicator | |||
PayloadFormatIndicator = _payloadFormatIndicator, | |||
UserProperties = _userProperties | |||
}; | |||
if (_userProperties?.Any() == true) | |||
{ | |||
applicationMessage.UserProperties = _userProperties; | |||
} | |||
return applicationMessage; | |||
} | |||
} | |||
@@ -23,6 +23,16 @@ namespace MQTTnet.PacketDispatcher | |||
{ | |||
if (packet == null) throw new ArgumentNullException(nameof(packet)); | |||
if (packet is MqttDisconnectPacket disconnectPacket) | |||
{ | |||
foreach (var packetAwaiter in _packetAwaiters) | |||
{ | |||
packetAwaiter.Value.Fail(new MqttUnexpectedDisconnectReceivedException(disconnectPacket)); | |||
} | |||
return; | |||
} | |||
ushort identifier = 0; | |||
if (packet is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue) | |||
{ | |||
@@ -15,9 +15,9 @@ namespace MQTTnet.Packets | |||
public byte[] CorrelationData { get; set; } | |||
public List<MqttUserProperty> UserProperties { get; } = new List<MqttUserProperty>(); | |||
public List<MqttUserProperty> UserProperties { get; set; } | |||
public uint? SubscriptionIdentifier { get; set; } | |||
public List<uint> SubscriptionIdentifiers { get; set; } | |||
public string ContentType { get; set; } | |||
} | |||
@@ -9,7 +9,7 @@ | |||
CorrelationData = 9, | |||
SubscriptionIdentifier = 11, | |||
SessionExpiryInterval = 17, | |||
AssignedClientIdentifer = 18, | |||
AssignedClientIdentifier = 18, | |||
ServerKeepAlive = 19, | |||
AuthenticationMethod = 21, | |||
AuthenticationData = 22, | |||
@@ -10,12 +10,47 @@ using MQTTnet.Client.Unsubscribing; | |||
using MQTTnet.Formatter; | |||
using MQTTnet.Protocol; | |||
using MQTTnet.Server; | |||
using MQTTnet.Tests.Mockups; | |||
namespace MQTTnet.Tests.MQTTv5 | |||
{ | |||
[TestClass] | |||
public class Client_Tests | |||
{ | |||
[TestMethod] | |||
public async Task Connect_With_New_Mqtt_Features() | |||
{ | |||
using (var testEnvironment = new TestEnvironment()) | |||
{ | |||
await testEnvironment.StartServerAsync(); | |||
// This test can be also executed against "broker.hivemq.com" to validate package format. | |||
var client = await testEnvironment.ConnectClientAsync( | |||
new MqttClientOptionsBuilder() | |||
//.WithTcpServer("broker.hivemq.com") | |||
.WithTcpServer("127.0.0.1", testEnvironment.ServerPort) | |||
.WithProtocolVersion(MqttProtocolVersion.V500) | |||
.WithTopicAliasMaximum(20) | |||
.WithReceiveMaximum(20) | |||
.WithWillMessage(new MqttApplicationMessageBuilder().WithTopic("abc").Build()) | |||
.WithWillDelayInterval(20) | |||
.Build()); | |||
await client.SubscribeAsync("a"); | |||
await client.PublishAsync(new MqttApplicationMessageBuilder() | |||
.WithTopic("a") | |||
.WithPayload("x") | |||
.WithUserProperty("a", "1") | |||
.WithUserProperty("b", "2") | |||
.WithPayloadFormatIndicator(MqttPayloadFormatIndicator.CharacterData) | |||
.WithAtLeastOnceQoS() | |||
.Build()); | |||
await Task.Delay(500); | |||
} | |||
} | |||
[TestMethod] | |||
public async Task Connect() | |||
{ | |||
@@ -63,7 +98,16 @@ namespace MQTTnet.Tests.MQTTv5 | |||
await server.StartAsync(new MqttServerOptions()); | |||
await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").WithProtocolVersion(MqttProtocolVersion.V500).Build()); | |||
var result = await client.SubscribeAsync("a", MqttQualityOfServiceLevel.AtLeastOnce); | |||
var result = await client.SubscribeAsync(new MqttClientSubscribeOptions() | |||
{ | |||
SubscriptionIdentifier = 1, | |||
TopicFilters = new List<TopicFilter> | |||
{ | |||
new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce} | |||
} | |||
}); | |||
await client.DisconnectAsync(); | |||
Assert.AreEqual(1, result.Items.Count); | |||
@@ -0,0 +1,40 @@ | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||
using MQTTnet.Client; | |||
using MQTTnet.Client.Options; | |||
using MQTTnet.Formatter; | |||
using MQTTnet.Tests.Mockups; | |||
namespace MQTTnet.Tests.MQTTv5 | |||
{ | |||
[TestClass] | |||
public class Server_Tests | |||
{ | |||
[TestMethod] | |||
public async Task Will_Message_Send() | |||
{ | |||
using (var testEnvironment = new TestEnvironment()) | |||
{ | |||
var receivedMessagesCount = 0; | |||
await testEnvironment.StartServerAsync(); | |||
var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build(); | |||
var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage).WithProtocolVersion(MqttProtocolVersion.V500); | |||
var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500)); | |||
c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount)); | |||
await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build()); | |||
var c2 = await testEnvironment.ConnectClientAsync(clientOptions); | |||
c2.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent. | |||
await Task.Delay(1000); | |||
Assert.AreEqual(1, receivedMessagesCount); | |||
} | |||
} | |||
} | |||
} |
@@ -90,12 +90,24 @@ namespace MQTTnet.Tests.Mockups | |||
public async Task<IMqttClient> ConnectClientAsync(MqttClientOptionsBuilder options) | |||
{ | |||
if (options == null) throw new ArgumentNullException(nameof(options)); | |||
var client = CreateClient(); | |||
await client.ConnectAsync(options.WithTcpServer("localhost", ServerPort).Build()); | |||
return client; | |||
} | |||
public async Task<IMqttClient> ConnectClientAsync(IMqttClientOptions options) | |||
{ | |||
if (options == null) throw new ArgumentNullException(nameof(options)); | |||
var client = CreateClient(); | |||
await client.ConnectAsync(options); | |||
return client; | |||
} | |||
public void ThrowIfLogErrors() | |||
{ | |||
lock (_serverErrors) | |||
@@ -1,4 +1,5 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.IO; | |||
using System.Linq; | |||
using System.Text; | |||
@@ -234,7 +235,7 @@ namespace MQTTnet.Tests | |||
[TestMethod] | |||
public void SerializeV500_MqttPublishPacket() | |||
{ | |||
var prop = new MqttPublishPacketProperties(); | |||
var prop = new MqttPublishPacketProperties {UserProperties = new List<MqttUserProperty>()}; | |||
prop.ResponseTopic = "/Response"; | |||
@@ -1 +1 @@ | |||
theme: jekyll-theme-modernist | |||
theme: jekyll-theme-hacker |
@@ -14,5 +14,5 @@ build: | |||
verbosity: minimal | |||
test_script: | |||
- cmd: dotnet vstest "%APPVEYOR_BUILD_FOLDER%\Tests\MQTTnet.Core.Tests\bin\Release\netcoreapp2.1\MQTTnet.Core.Tests.dll" | |||
- cmd: dotnet vstest "%APPVEYOR_BUILD_FOLDER%\Tests\MQTTnet.Core.Tests\bin\Release\netcoreapp2.1\MQTTnet.Tests.dll" | |||
- cmd: dotnet vstest "%APPVEYOR_BUILD_FOLDER%\Tests\MQTTnet.AspNetCore.Tests\bin\Release\netcoreapp2.1\MQTTnet.AspNetCore.Tests.dll" |