From fbb5cd45a08c84de453e23e7fb637d6fc8f6a4b8 Mon Sep 17 00:00:00 2001 From: Christian Date: Tue, 30 Mar 2021 21:06:44 +0200 Subject: [PATCH] Refactor deferred ACK of publish packets. --- Build/MQTTnet.nuspec | 1 + Source/MQTTnet/Client/MqttClient.cs | 52 ++++++++-------- ...MqttApplicationMessageReceivedEventArgs.cs | 61 +++++++++---------- .../Server/MqttServerEventDispatcher.cs | 2 +- Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs | 2 +- 5 files changed, 57 insertions(+), 61 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 21efd37..871655a 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -16,6 +16,7 @@ * [Client] Changed exception types so that proper exceptions are thrown when connecting (#1082). * [Client] Exposed Dup flag in application messages. * [Client] Improved keep alive message sending. +* [Client] Added support for deferred message approval (#1075, thanks to @tkurucsai). * [RpcClient] Fixed an issue when using a custom application message reveived handler (#1093). * [RpcClient] Fixed a race condition when using timeout and cancellation token at the same time (BREAKING CHANGE!). * [Server] Improved keep alive message sending. diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 47ac903..638cf6b 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -230,21 +230,21 @@ namespace MQTTnet.Client switch (applicationMessage.QualityOfServiceLevel) { case MqttQualityOfServiceLevel.AtMostOnce: - { - return PublishAtMostOnce(publishPacket, cancellationToken); - } + { + return PublishAtMostOnce(publishPacket, cancellationToken); + } case MqttQualityOfServiceLevel.AtLeastOnce: - { - return PublishAtLeastOnceAsync(publishPacket, cancellationToken); - } + { + return PublishAtLeastOnceAsync(publishPacket, cancellationToken); + } case MqttQualityOfServiceLevel.ExactlyOnce: - { - return PublishExactlyOnceAsync(publishPacket, cancellationToken); - } + { + return PublishExactlyOnceAsync(publishPacket, cancellationToken); + } default: - { - throw new NotSupportedException(); - } + { + throw new NotSupportedException(); + } } } @@ -365,7 +365,8 @@ namespace MQTTnet.Client { // This handler must be executed in a new thread because otherwise a dead lock may happen // when trying to reconnect in that handler etc. - Task.Run(() => disconnectedHandler.HandleDisconnectedAsync(new MqttClientDisconnectedEventArgs(clientWasConnected, exception, authenticateResult, _disconnectReason))).RunInBackground(_logger); + Task.Run(() => disconnectedHandler.HandleDisconnectedAsync(new MqttClientDisconnectedEventArgs(clientWasConnected, exception, authenticateResult, _disconnectReason))) + .RunInBackground(_logger); } } } @@ -585,9 +586,9 @@ namespace MQTTnet.Client if (!_packetDispatcher.TryDispatch(packet)) { throw new MqttProtocolViolationException($"Received packet '{packet}' at an unexpected time."); + } } } - } catch (Exception exception) { if (_cleanDisconnectInitiated) @@ -641,13 +642,12 @@ namespace MQTTnet.Client } var publishPacket = publishPacketDequeueResult.Item; - var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket, cancellationToken); + var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); - if (eventArgs.AutoAcknowledge) + if (eventArgs.AutoAcknowledge) { - await eventArgs.Acknowledge(); + await eventArgs.AcknowledgeAsync(cancellationToken).ConfigureAwait(false); } - } catch (OperationCanceledException) { @@ -659,7 +659,7 @@ namespace MQTTnet.Client } } - internal Task AcknowledgeReceivedPublishPacket(MqttApplicationMessageReceivedEventArgs eventArgs) + internal Task AcknowledgeReceivedPublishPacket(MqttApplicationMessageReceivedEventArgs eventArgs, CancellationToken cancellationToken) { if (eventArgs.PublishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { @@ -670,7 +670,7 @@ namespace MQTTnet.Client if (!eventArgs.ProcessingFailed) { var pubAckPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubAckPacket(eventArgs.PublishPacket, eventArgs.ReasonCode); - return SendAsync(pubAckPacket, eventArgs.CancellationToken); + return SendAsync(pubAckPacket, cancellationToken); } } else if (eventArgs.PublishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) @@ -678,7 +678,7 @@ namespace MQTTnet.Client if (!eventArgs.ProcessingFailed) { var pubRecPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubRecPacket(eventArgs.PublishPacket, eventArgs.ReasonCode); - return SendAsync(pubRecPacket, eventArgs.CancellationToken); + return SendAsync(pubRecPacket, cancellationToken); } } else @@ -710,7 +710,7 @@ namespace MQTTnet.Client Task ProcessReceivedDisconnectPacket(MqttDisconnectPacket disconnectPacket) { - _disconnectReason = (MqttClientDisconnectReason)(disconnectPacket.ReasonCode ?? MqttDisconnectReasonCode.NormalDisconnection); + _disconnectReason = (MqttClientDisconnectReason) (disconnectPacket.ReasonCode ?? MqttDisconnectReasonCode.NormalDisconnection); // Also dispatch disconnect to waiting threads to generate a proper exception. _packetDispatcher.FailAll(new MqttUnexpectedDisconnectReceivedException(disconnectPacket)); @@ -755,16 +755,16 @@ namespace MQTTnet.Client var pubRecPacket = await SendAndReceiveAsync(publishPacket, cancellationToken).ConfigureAwait(false); var pubRelPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubRelPacket(pubRecPacket, MqttApplicationMessageReceivedReasonCode.Success); - + var pubCompPacket = await SendAndReceiveAsync(pubRelPacket, cancellationToken).ConfigureAwait(false); return _adapter.PacketFormatterAdapter.DataConverter.CreateClientPublishResult(pubRecPacket, pubCompPacket); } - async Task HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken) + async Task HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket) { var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket); - var eventArgs = new MqttApplicationMessageReceivedEventArgs(this, publishPacket, cancellationToken, Options.ClientId, applicationMessage); + var eventArgs = new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage, publishPacket, AcknowledgeReceivedPublishPacket); var handler = ApplicationMessageReceivedHandler; if (handler != null) @@ -813,4 +813,4 @@ namespace MQTTnet.Client return Interlocked.CompareExchange(ref _isDisconnectPending, 1, 0) != 0; } } -} +} \ No newline at end of file diff --git a/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs b/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs index 8e4f2f5..ea54dd5 100644 --- a/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs +++ b/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs @@ -1,41 +1,30 @@ -using MQTTnet.Client; -using MQTTnet.Implementations; -using MQTTnet.Packets; using System; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Packets; namespace MQTTnet { public sealed class MqttApplicationMessageReceivedEventArgs : EventArgs { - public MqttApplicationMessageReceivedEventArgs(MqttClient client, MqttPublishPacket publishPacket, CancellationToken cancellationToken, string clientId, MqttApplicationMessage applicationMessage) - { - Client = client ?? throw new ArgumentNullException(nameof(client)); - PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket)); - CancellationToken = cancellationToken; - acknowledged = 0; - ClientId = clientId; - ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); - AutoAcknowledge = true; - } - - public MqttApplicationMessageReceivedEventArgs(string clientId, MqttApplicationMessage applicationMessage) + readonly Func _acknowledgeHandler; + + int _isAcknowledged; + + public MqttApplicationMessageReceivedEventArgs( + string clientId, + MqttApplicationMessage applicationMessage, + MqttPublishPacket publishPacket, + Func acknowledgeHandler) { - acknowledged = 1; ClientId = clientId; ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); - AutoAcknowledge = true; + PublishPacket = publishPacket; + _acknowledgeHandler = acknowledgeHandler; } - internal MqttClient Client { get; } - - internal CancellationToken CancellationToken { get; } - internal MqttPublishPacket PublishPacket { get; } - - int acknowledged; - + /// /// Gets the client identifier. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. @@ -53,21 +42,27 @@ namespace MQTTnet /// This value can be used in user code for custom control flow. /// public bool IsHandled { get; set; } + + /// + /// Gets ir sets whether the library should send MQTT ACK packets automatically if required. + /// + public bool AutoAcknowledge { get; set; } = true; public object Tag { get; set; } - - public bool AutoAcknowledge { get; set; } - - public Task Acknowledge() - { - if (Interlocked.CompareExchange(ref acknowledged, 1, 0) == 0) + + public Task AcknowledgeAsync(CancellationToken cancellationToken) + { + if (_acknowledgeHandler == null) { - return Client.AcknowledgeReceivedPublishPacket(this); + throw new NotSupportedException("Deferred acknowledgement of application message is not yet supported in MQTTnet server."); } - else + + if (Interlocked.CompareExchange(ref _isAcknowledged, 1, 0) == 0) { - return PlatformAbstractionLayer.CompletedTask; + return _acknowledgeHandler(this, cancellationToken); } + + throw new InvalidOperationException("The application message is already acknowledged."); } } } diff --git a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs index 8a416a7..51f35a4 100644 --- a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs +++ b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs @@ -108,7 +108,7 @@ namespace MQTTnet.Server return; } - await handler.HandleApplicationMessageReceivedAsync(new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage)).ConfigureAwait(false); + await handler.HandleApplicationMessageReceivedAsync(new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage, null, null)).ConfigureAwait(false); } catch (Exception exception) { diff --git a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs index c28fdf9..66bef2e 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs @@ -466,7 +466,7 @@ namespace MQTTnet.Tests { var value = int.Parse(eventArgs.ApplicationMessage.ConvertPayloadToString()); eventArgs.AutoAcknowledge = false; - Task.Delay(value).ContinueWith(x => eventArgs.Acknowledge()); + Task.Delay(value).ContinueWith(x => eventArgs.AcknowledgeAsync(CancellationToken.None)); System.Diagnostics.Debug.WriteLine($"received {value}"); lock (receivedValues)