diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index cea9db2..b3a4c5b 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -27,6 +27,7 @@ * [Server] Fixed bug in PubRel packet generation (MQTTv5 only). * [Server] Improved message processing performance (+ ~5%). * [Server] Fix wrong usage of client session items for undelivered messages. +* [Server] Allow to respond with a reason code in PUBACK/PUBREC (thanks to @muneebmajeed). Git commit: $gitCommit diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index a415c43..6b454d9 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -637,25 +637,27 @@ namespace MQTTnet.Client } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { - var reasonCode = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); - if (reasonCode != null && Enum.IsDefined(typeof(MqttPubAckReasonCode), reasonCode.Value)) + var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); + + if (!eventArgs.ProcessingFailed) { await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier, - ReasonCode = (MqttPubAckReasonCode)reasonCode.Value + ReasonCode = (MqttPubAckReasonCode)eventArgs.ReasonCode }, cancellationToken).ConfigureAwait(false); } } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { - var reasonCode = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); - if (reasonCode != null && Enum.IsDefined(typeof(MqttPubRecReasonCode), reasonCode.Value)) + var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); + + if (!eventArgs.ProcessingFailed) { await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier, - ReasonCode = (MqttPubRecReasonCode)reasonCode.Value + ReasonCode = (MqttPubRecReasonCode)eventArgs.ReasonCode }, cancellationToken).ConfigureAwait(false); } } @@ -678,7 +680,7 @@ namespace MQTTnet.Client { if (!_packetDispatcher.TryDispatch(pubRecPacket)) { - // The packet is unknown. Probably due to a restart of the client. + // The packet is unknown. Probably due to a restart of the client. // So wen send this to the server to trigger a full resend of the message. return SendAsync(new MqttPubRelPacket { @@ -756,22 +758,18 @@ namespace MQTTnet.Client return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(pubRecPacket, pubCompPacket); } - async Task HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket) + async Task HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket) { - int? reasonCode = 0; var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket); + var eventArgs = new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage); var handler = ApplicationMessageReceivedHandler; if (handler != null) { - var eventArgs = new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage); await handler.HandleApplicationMessageReceivedAsync(eventArgs).ConfigureAwait(false); - if (eventArgs.ProcessingFailed) - { - reasonCode = eventArgs.ReasonCode; - } } - return reasonCode; + + return eventArgs; } async Task WaitForTaskAsync(Task task, Task sender) diff --git a/Source/MQTTnet/Formatter/MqttPacketWriter.cs b/Source/MQTTnet/Formatter/MqttPacketWriter.cs index a51ba61..c7ab8ac 100644 --- a/Source/MQTTnet/Formatter/MqttPacketWriter.cs +++ b/Source/MQTTnet/Formatter/MqttPacketWriter.cs @@ -97,7 +97,7 @@ namespace MQTTnet.Formatter _buffer[_offset] = (byte)(bufferSize >> 8); _buffer[_offset + 1] = (byte)bufferSize; - + Encoding.UTF8.GetBytes(value, 0, value.Length, _buffer, _offset + 2); IncreasePosition(bufferSize + 2); @@ -121,7 +121,7 @@ namespace MQTTnet.Formatter _buffer[_offset] = (byte)(value.Length >> 8); _buffer[_offset + 1] = (byte)value.Length; - + Array.Copy(value, 0, _buffer, _offset + 2, value.Length); IncreasePosition(value.Length + 2); } @@ -201,8 +201,8 @@ namespace MQTTnet.Formatter { // This method frees the used memory by shrinking the buffer. This is required because the buffer // is used across several messages. In general this is not a big issue because subsequent Ping packages - // have the same size but a very big publish package with 100 MB of payload will increase the buffer - // a lot and the size will never reduced. So this method tries to find a size which can be held in + // have the same size but a very big publish package with 100 MB of payload will increase the buffer + // a lot and the size will never reduced. So this method tries to find a size which can be held in // memory for a long time without causing troubles. if (_buffer.Length < MaxBufferSize) @@ -212,7 +212,7 @@ namespace MQTTnet.Formatter Array.Resize(ref _buffer, MaxBufferSize); } - + [MethodImpl(MethodImplOptions.AggressiveInlining)] void EnsureAdditionalCapacity(int additionalCapacity) { diff --git a/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs b/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs index 8bbc571..a27f9c7 100644 --- a/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs +++ b/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs @@ -1,5 +1,4 @@ -using MQTTnet.Protocol; -using System; +using System; namespace MQTTnet { @@ -17,6 +16,6 @@ namespace MQTTnet public bool ProcessingFailed { get; set; } - public int? ReasonCode { get; set; } + public MqttApplicationMessageReceivedReasonCode ReasonCode { get; set; } = MqttApplicationMessageReceivedReasonCode.Success; } } diff --git a/Source/MQTTnet/MqttApplicationMessageReceivedReasonCode.cs b/Source/MQTTnet/MqttApplicationMessageReceivedReasonCode.cs new file mode 100644 index 0000000..b192d31 --- /dev/null +++ b/Source/MQTTnet/MqttApplicationMessageReceivedReasonCode.cs @@ -0,0 +1,15 @@ +namespace MQTTnet +{ + public enum MqttApplicationMessageReceivedReasonCode + { + Success = 0, + NoMatchingSubscribers = 16, + UnspecifiedError = 128, + ImplementationSpecificError = 131, + NotAuthorized = 135, + TopicNameInvalid = 144, + PacketIdentifierInUse = 145, + QuotaExceeded = 151, + PayloadFormatInvalid = 153 + } +} \ No newline at end of file diff --git a/Source/MQTTnet/MqttTopicFilterBuilder.cs b/Source/MQTTnet/MqttTopicFilterBuilder.cs index 1cf8d8f..3bb75ab 100644 --- a/Source/MQTTnet/MqttTopicFilterBuilder.cs +++ b/Source/MQTTnet/MqttTopicFilterBuilder.cs @@ -64,7 +64,7 @@ namespace MQTTnet _retainHandling = value; return this; } - + public MqttTopicFilter Build() { if (string.IsNullOrEmpty(_topic)) @@ -74,7 +74,7 @@ namespace MQTTnet return new MqttTopicFilter { - Topic = _topic, + Topic = _topic, QualityOfServiceLevel = _qualityOfServiceLevel, NoLocal = _noLocal, RetainAsPublished = _retainAsPublished,