Browse Source

Allow to respond with a reason code in PUBACK/PUBREC

release/3.x.x
Muneeb Majeed 3 years ago
parent
commit
f8eaf02516
2 changed files with 19 additions and 13 deletions
  1. +15
    -12
      Source/MQTTnet/Client/MqttClient.cs
  2. +4
    -1
      Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs

+ 15
- 12
Source/MQTTnet/Client/MqttClient.cs View File

@@ -637,26 +637,26 @@ namespace MQTTnet.Client
} }
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{ {
if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false))
var reasonCode = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false);
if (reasonCode != null && Enum.IsDefined(typeof(MqttPubAckReasonCode), reasonCode.Value))
{ {
await SendAsync(new MqttPubAckPacket await SendAsync(new MqttPubAckPacket
{ {
PacketIdentifier = publishPacket.PacketIdentifier, PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubAckReasonCode.Success
ReasonCode = (MqttPubAckReasonCode)reasonCode.Value
}, cancellationToken).ConfigureAwait(false); }, cancellationToken).ConfigureAwait(false);
} }
} }
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{ {
if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false))
var reasonCode = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false);
if (reasonCode != null && Enum.IsDefined(typeof(MqttPubRecReasonCode), reasonCode.Value))
{ {
var pubRecPacket = new MqttPubRecPacket
await SendAsync(new MqttPubRecPacket
{ {
PacketIdentifier = publishPacket.PacketIdentifier, PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubRecReasonCode.Success
};

await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false);
ReasonCode = (MqttPubRecReasonCode)reasonCode.Value
}, cancellationToken).ConfigureAwait(false);
} }
} }
else else
@@ -756,8 +756,9 @@ namespace MQTTnet.Client
return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(pubRecPacket, pubCompPacket); return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(pubRecPacket, pubCompPacket);
} }


async Task<bool> HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket)
async Task<int?> HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket)
{ {
int? reasonCode = 0;
var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket); var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket);


var handler = ApplicationMessageReceivedHandler; var handler = ApplicationMessageReceivedHandler;
@@ -765,10 +766,12 @@ namespace MQTTnet.Client
{ {
var eventArgs = new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage); var eventArgs = new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage);
await handler.HandleApplicationMessageReceivedAsync(eventArgs).ConfigureAwait(false); await handler.HandleApplicationMessageReceivedAsync(eventArgs).ConfigureAwait(false);
return !eventArgs.ProcessingFailed;
if (eventArgs.ProcessingFailed)
{
reasonCode = eventArgs.ReasonCode;
}
} }

return true;
return reasonCode;
} }


async Task WaitForTaskAsync(Task task, Task sender) async Task WaitForTaskAsync(Task task, Task sender)


+ 4
- 1
Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs View File

@@ -1,4 +1,5 @@
using System;
using MQTTnet.Protocol;
using System;


namespace MQTTnet namespace MQTTnet
{ {
@@ -15,5 +16,7 @@ namespace MQTTnet
public MqttApplicationMessage ApplicationMessage { get; } public MqttApplicationMessage ApplicationMessage { get; }


public bool ProcessingFailed { get; set; } public bool ProcessingFailed { get; set; }

public int? ReasonCode { get; set; }
} }
} }

Loading…
Cancel
Save