@@ -27,6 +27,7 @@ | |||
* [Server] Fixed bug in PubRel packet generation (MQTTv5 only). | |||
* [Server] Improved message processing performance (+ ~5%). | |||
* [Server] Fix wrong usage of client session items for undelivered messages. | |||
* [Server] Allow to respond with a reason code in PUBACK/PUBREC (thanks to @muneebmajeed). | |||
Git commit: $gitCommit | |||
</releaseNotes> | |||
@@ -637,25 +637,27 @@ namespace MQTTnet.Client | |||
} | |||
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) | |||
{ | |||
var reasonCode = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); | |||
if (reasonCode != null && Enum.IsDefined(typeof(MqttPubAckReasonCode), reasonCode.Value)) | |||
var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); | |||
if (!eventArgs.ProcessingFailed) | |||
{ | |||
await SendAsync(new MqttPubAckPacket | |||
{ | |||
PacketIdentifier = publishPacket.PacketIdentifier, | |||
ReasonCode = (MqttPubAckReasonCode)reasonCode.Value | |||
ReasonCode = (MqttPubAckReasonCode)eventArgs.ReasonCode | |||
}, cancellationToken).ConfigureAwait(false); | |||
} | |||
} | |||
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) | |||
{ | |||
var reasonCode = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); | |||
if (reasonCode != null && Enum.IsDefined(typeof(MqttPubRecReasonCode), reasonCode.Value)) | |||
var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); | |||
if (!eventArgs.ProcessingFailed) | |||
{ | |||
await SendAsync(new MqttPubRecPacket | |||
{ | |||
PacketIdentifier = publishPacket.PacketIdentifier, | |||
ReasonCode = (MqttPubRecReasonCode)reasonCode.Value | |||
ReasonCode = (MqttPubRecReasonCode)eventArgs.ReasonCode | |||
}, cancellationToken).ConfigureAwait(false); | |||
} | |||
} | |||
@@ -678,7 +680,7 @@ namespace MQTTnet.Client | |||
{ | |||
if (!_packetDispatcher.TryDispatch(pubRecPacket)) | |||
{ | |||
// The packet is unknown. Probably due to a restart of the client. | |||
// The packet is unknown. Probably due to a restart of the client. | |||
// So wen send this to the server to trigger a full resend of the message. | |||
return SendAsync(new MqttPubRelPacket | |||
{ | |||
@@ -756,22 +758,18 @@ namespace MQTTnet.Client | |||
return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(pubRecPacket, pubCompPacket); | |||
} | |||
async Task<int?> HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket) | |||
async Task<MqttApplicationMessageReceivedEventArgs> HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket) | |||
{ | |||
int? reasonCode = 0; | |||
var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket); | |||
var eventArgs = new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage); | |||
var handler = ApplicationMessageReceivedHandler; | |||
if (handler != null) | |||
{ | |||
var eventArgs = new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage); | |||
await handler.HandleApplicationMessageReceivedAsync(eventArgs).ConfigureAwait(false); | |||
if (eventArgs.ProcessingFailed) | |||
{ | |||
reasonCode = eventArgs.ReasonCode; | |||
} | |||
} | |||
return reasonCode; | |||
return eventArgs; | |||
} | |||
async Task WaitForTaskAsync(Task task, Task sender) | |||
@@ -97,7 +97,7 @@ namespace MQTTnet.Formatter | |||
_buffer[_offset] = (byte)(bufferSize >> 8); | |||
_buffer[_offset + 1] = (byte)bufferSize; | |||
Encoding.UTF8.GetBytes(value, 0, value.Length, _buffer, _offset + 2); | |||
IncreasePosition(bufferSize + 2); | |||
@@ -121,7 +121,7 @@ namespace MQTTnet.Formatter | |||
_buffer[_offset] = (byte)(value.Length >> 8); | |||
_buffer[_offset + 1] = (byte)value.Length; | |||
Array.Copy(value, 0, _buffer, _offset + 2, value.Length); | |||
IncreasePosition(value.Length + 2); | |||
} | |||
@@ -201,8 +201,8 @@ namespace MQTTnet.Formatter | |||
{ | |||
// This method frees the used memory by shrinking the buffer. This is required because the buffer | |||
// is used across several messages. In general this is not a big issue because subsequent Ping packages | |||
// have the same size but a very big publish package with 100 MB of payload will increase the buffer | |||
// a lot and the size will never reduced. So this method tries to find a size which can be held in | |||
// have the same size but a very big publish package with 100 MB of payload will increase the buffer | |||
// a lot and the size will never reduced. So this method tries to find a size which can be held in | |||
// memory for a long time without causing troubles. | |||
if (_buffer.Length < MaxBufferSize) | |||
@@ -212,7 +212,7 @@ namespace MQTTnet.Formatter | |||
Array.Resize(ref _buffer, MaxBufferSize); | |||
} | |||
[MethodImpl(MethodImplOptions.AggressiveInlining)] | |||
void EnsureAdditionalCapacity(int additionalCapacity) | |||
{ | |||
@@ -1,5 +1,4 @@ | |||
using MQTTnet.Protocol; | |||
using System; | |||
using System; | |||
namespace MQTTnet | |||
{ | |||
@@ -17,6 +16,6 @@ namespace MQTTnet | |||
public bool ProcessingFailed { get; set; } | |||
public int? ReasonCode { get; set; } | |||
public MqttApplicationMessageReceivedReasonCode ReasonCode { get; set; } = MqttApplicationMessageReceivedReasonCode.Success; | |||
} | |||
} |
@@ -0,0 +1,15 @@ | |||
namespace MQTTnet | |||
{ | |||
public enum MqttApplicationMessageReceivedReasonCode | |||
{ | |||
Success = 0, | |||
NoMatchingSubscribers = 16, | |||
UnspecifiedError = 128, | |||
ImplementationSpecificError = 131, | |||
NotAuthorized = 135, | |||
TopicNameInvalid = 144, | |||
PacketIdentifierInUse = 145, | |||
QuotaExceeded = 151, | |||
PayloadFormatInvalid = 153 | |||
} | |||
} |
@@ -64,7 +64,7 @@ namespace MQTTnet | |||
_retainHandling = value; | |||
return this; | |||
} | |||
public MqttTopicFilter Build() | |||
{ | |||
if (string.IsNullOrEmpty(_topic)) | |||
@@ -74,7 +74,7 @@ namespace MQTTnet | |||
return new MqttTopicFilter | |||
{ | |||
Topic = _topic, | |||
Topic = _topic, | |||
QualityOfServiceLevel = _qualityOfServiceLevel, | |||
NoLocal = _noLocal, | |||
RetainAsPublished = _retainAsPublished, | |||