From f955b4a3e6a5c1685e4c0074c752a323cff0fb13 Mon Sep 17 00:00:00 2001 From: "tamas.kurucsai" Date: Mon, 18 Jan 2021 10:47:21 +0100 Subject: [PATCH 1/5] implemented delayed message acknowledgement --- Source/MQTTnet/Client/MqttClient.cs | 58 ++++++++++++++++--- ...MqttApplicationMessageReceivedEventArgs.cs | 3 + 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index a6901fe..1ba8905 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -630,25 +630,28 @@ namespace MQTTnet.Client } var publishPacket = publishPacketDequeueResult.Item; + var publishResult = await HandleReceivedApplicationMessageAsync(publishPacket); if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { - await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); + // no response required } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { - if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false)) + if (publishResult.ProcessingSucceeded) { - await SendAsync(new MqttPubAckPacket + var pubAckPacket = new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier, ReasonCode = MqttPubAckReasonCode.Success - }, cancellationToken).ConfigureAwait(false); + }; + + await PublishResponseForReceivedPublishPacket(publishResult, pubAckPacket, cancellationToken).ConfigureAwait(false); } } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { - if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false)) + if (publishResult.ProcessingSucceeded) { var pubRecPacket = new MqttPubRecPacket { @@ -656,7 +659,7 @@ namespace MQTTnet.Client ReasonCode = MqttPubRecReasonCode.Success }; - await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false); + await PublishResponseForReceivedPublishPacket(publishResult, pubRecPacket, cancellationToken).ConfigureAwait(false); } } else @@ -674,6 +677,31 @@ namespace MQTTnet.Client } } + async Task PublishResponseForReceivedPublishPacket(ReceivedApplicationMessageResult publishResult, MqttBasePacket resultPacket, CancellationToken cancellationToken) + { + if (publishResult.PendingTask == null) + { + await SendAsync(resultPacket, cancellationToken).ConfigureAwait(false); + } + else + { + _ = publishResult.PendingTask.ContinueWith(async x => + { + try + { + if (x.Result) await SendAsync(resultPacket, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + catch (Exception exception) + { + _logger.Error(exception, "Error while handling application message."); + } + }); + } + } + Task ProcessReceivedPubRecPacket(MqttPubRecPacket pubRecPacket, CancellationToken cancellationToken) { if (!_packetDispatcher.TryDispatch(pubRecPacket)) @@ -756,7 +784,19 @@ namespace MQTTnet.Client return _adapter.PacketFormatterAdapter.DataConverter.CreatePublishResult(pubRecPacket, pubCompPacket); } - async Task HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket) + struct ReceivedApplicationMessageResult + { + public readonly bool ProcessingSucceeded; + public readonly Task PendingTask; + + public ReceivedApplicationMessageResult(bool processingSucceeded, Task pendingTask) + { + this.ProcessingSucceeded = processingSucceeded; + this.PendingTask = pendingTask; + } + } + + async Task HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket) { var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket); @@ -765,10 +805,10 @@ namespace MQTTnet.Client { var eventArgs = new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage); await handler.HandleApplicationMessageReceivedAsync(eventArgs).ConfigureAwait(false); - return !eventArgs.ProcessingFailed; + return new ReceivedApplicationMessageResult(!eventArgs.ProcessingFailed, eventArgs.PendingTask); } - return true; + return new ReceivedApplicationMessageResult(true, null); } async Task WaitForTaskAsync(Task task, Task sender) diff --git a/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs b/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs index ed46305..6567306 100644 --- a/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs +++ b/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; namespace MQTTnet { @@ -15,5 +16,7 @@ namespace MQTTnet public MqttApplicationMessage ApplicationMessage { get; } public bool ProcessingFailed { get; set; } + + public Task PendingTask { get; set; } } } From fcbe2e32f635a08249e34832118ec152d72d5243 Mon Sep 17 00:00:00 2001 From: "tamas.kurucsai" Date: Mon, 18 Jan 2021 17:58:49 +0100 Subject: [PATCH 2/5] corrected name of response sender method --- Source/MQTTnet/Client/MqttClient.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 1ba8905..bbdaca9 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -646,7 +646,7 @@ namespace MQTTnet.Client ReasonCode = MqttPubAckReasonCode.Success }; - await PublishResponseForReceivedPublishPacket(publishResult, pubAckPacket, cancellationToken).ConfigureAwait(false); + await SendResponseForReceivedPublishPacket(publishResult, pubAckPacket, cancellationToken).ConfigureAwait(false); } } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) @@ -659,7 +659,7 @@ namespace MQTTnet.Client ReasonCode = MqttPubRecReasonCode.Success }; - await PublishResponseForReceivedPublishPacket(publishResult, pubRecPacket, cancellationToken).ConfigureAwait(false); + await SendResponseForReceivedPublishPacket(publishResult, pubRecPacket, cancellationToken).ConfigureAwait(false); } } else @@ -677,7 +677,7 @@ namespace MQTTnet.Client } } - async Task PublishResponseForReceivedPublishPacket(ReceivedApplicationMessageResult publishResult, MqttBasePacket resultPacket, CancellationToken cancellationToken) + async Task SendResponseForReceivedPublishPacket(ReceivedApplicationMessageResult publishResult, MqttBasePacket resultPacket, CancellationToken cancellationToken) { if (publishResult.PendingTask == null) { From 822f582e4184d58949a05541221fcd7900b1c6ad Mon Sep 17 00:00:00 2001 From: "tamas.kurucsai" Date: Fri, 29 Jan 2021 07:03:19 +0100 Subject: [PATCH 3/5] added test for delayed message acknowledgement --- Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs | 48 ++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs index c4fdb22..36daed4 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs @@ -445,6 +445,54 @@ namespace MQTTnet.Tests } } + [TestMethod] + public async Task Preserve_Message_Order_With_Delayed_Acknowledgement() + { + // The messages are sent in reverse or to ensure that the delay in the handler + // needs longer for the first messages and later messages may be processed earlier (if there + // is an issue). + const int MessagesCount = 50; + + using (var testEnvironment = new TestEnvironment(TestContext)) + { + await testEnvironment.StartServerAsync(); + + var client1 = await testEnvironment.ConnectClientAsync(); + await client1.SubscribeAsync("x", MqttQualityOfServiceLevel.ExactlyOnce); + + var receivedValues = new List(); + + Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs) + { + var value = int.Parse(eventArgs.ApplicationMessage.ConvertPayloadToString()); + eventArgs.PendingTask = Task.Delay(value).ContinueWith(x => true); + + System.Diagnostics.Debug.WriteLine($"received {value}"); + lock (receivedValues) + { + receivedValues.Add(value); + } + + return Task.CompletedTask; + } + + client1.UseApplicationMessageReceivedHandler(Handler1); + + var client2 = await testEnvironment.ConnectClientAsync(); + for (var i = MessagesCount; i > 0; i--) + { + await client2.PublishAsync("x", i.ToString(), MqttQualityOfServiceLevel.ExactlyOnce); + } + + await Task.Delay(5000); + + for (var i = MessagesCount; i > 0; i--) + { + Assert.AreEqual(i, receivedValues[MessagesCount - i]); + } + } + } + [TestMethod] public async Task Send_Reply_For_Any_Received_Message() { From fbf7d9bb5c062b61e5f6460b2cb6f1df33952f13 Mon Sep 17 00:00:00 2001 From: "tamas.kurucsai" Date: Fri, 29 Jan 2021 07:03:56 +0100 Subject: [PATCH 4/5] corrected thread safety of ping timers after adding delayed acknowledgement --- Source/MQTTnet/Client/MqttClient.cs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index bbdaca9..c5f11af 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -25,8 +25,8 @@ namespace MQTTnet.Client { readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); - readonly Stopwatch _sendTracker = new Stopwatch(); - readonly Stopwatch _receiveTracker = new Stopwatch(); + volatile int _lastSentAtTicks; + volatile int _lastReceivedAtTicks; readonly object _disconnectLock = new object(); readonly IMqttClientAdapterFactory _adapterFactory; @@ -102,8 +102,8 @@ namespace MQTTnet.Client authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, combined.Token).ConfigureAwait(false); } - _sendTracker.Restart(); - _receiveTracker.Restart(); + _lastSentAtTicks = Environment.TickCount; + _lastReceivedAtTicks = Environment.TickCount; if (Options.KeepAlivePeriod != TimeSpan.Zero) { @@ -382,7 +382,7 @@ namespace MQTTnet.Client { cancellationToken.ThrowIfCancellationRequested(); - _sendTracker.Restart(); + _lastReceivedAtTicks = Environment.TickCount; return _adapter.SendPacketAsync(packet, cancellationToken); } @@ -401,7 +401,7 @@ namespace MQTTnet.Client { try { - _sendTracker.Restart(); + _lastSentAtTicks = Environment.TickCount; await _adapter.SendPacketAsync(requestPacket, cancellationToken).ConfigureAwait(false); } catch (Exception e) @@ -437,7 +437,7 @@ namespace MQTTnet.Client while (!cancellationToken.IsCancellationRequested) { // Values described here: [MQTT-3.1.2-24]. - var waitTime = keepAlivePeriod - _sendTracker.Elapsed; + var waitTime = keepAlivePeriod - TimeSpan.FromMilliseconds(unchecked(Environment.TickCount - _lastSentAtTicks)); if (waitTime <= TimeSpan.Zero) { @@ -546,7 +546,7 @@ namespace MQTTnet.Client { try { - _receiveTracker.Restart(); + _lastReceivedAtTicks = Environment.TickCount; if (packet is MqttPublishPacket publishPacket) { From 90b37a1e2de247c6fdd09d3c06a1c40b5294964d Mon Sep 17 00:00:00 2001 From: "tamas.kurucsai" Date: Sat, 20 Mar 2021 10:36:44 +0100 Subject: [PATCH 5/5] reimplemented delayed acknowledgement using a property to switch off automatic acknowledgement and a new method Acknowledge() in the event handler --- Source/MQTTnet/Client/MqttClient.cs | 70 ++++++++----------- ...MqttApplicationMessageReceivedEventArgs.cs | 41 ++++++++++- Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs | 3 +- 3 files changed, 69 insertions(+), 45 deletions(-) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index da84f41..47ac903 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -641,32 +641,13 @@ namespace MQTTnet.Client } var publishPacket = publishPacketDequeueResult.Item; - var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket); + var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket, cancellationToken); - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) + if (eventArgs.AutoAcknowledge) { - // no response required - } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) - { - if (!eventArgs.ProcessingFailed) - { - var pubAckPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubAckPacket(publishPacket, eventArgs.ReasonCode); - await SendResponseForReceivedPublishPacket(eventArgs, pubAckPacket, cancellationToken).ConfigureAwait(false); - } - } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) - { - if (!eventArgs.ProcessingFailed) - { - var pubRecPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubRecPacket(publishPacket, eventArgs.ReasonCode); - await SendResponseForReceivedPublishPacket(eventArgs, pubRecPacket, cancellationToken).ConfigureAwait(false); - } - } - else - { - throw new MqttProtocolViolationException("Received a not supported QoS level."); + await eventArgs.Acknowledge(); } + } catch (OperationCanceledException) { @@ -678,29 +659,34 @@ namespace MQTTnet.Client } } - async Task SendResponseForReceivedPublishPacket(MqttApplicationMessageReceivedEventArgs eventArgs, MqttBasePacket resultPacket, CancellationToken cancellationToken) + internal Task AcknowledgeReceivedPublishPacket(MqttApplicationMessageReceivedEventArgs eventArgs) { - if (eventArgs.PendingTask == null) + if (eventArgs.PublishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { - await SendAsync(resultPacket, cancellationToken).ConfigureAwait(false); + // no response required } - else + else if (eventArgs.PublishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { - _ = eventArgs.PendingTask.ContinueWith(async x => + if (!eventArgs.ProcessingFailed) { - try - { - if (x.Result) await SendAsync(resultPacket, cancellationToken).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - } - catch (Exception exception) - { - _logger.Error(exception, "Error while handling application message."); - } - }); + var pubAckPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubAckPacket(eventArgs.PublishPacket, eventArgs.ReasonCode); + return SendAsync(pubAckPacket, eventArgs.CancellationToken); + } } + else if (eventArgs.PublishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) + { + if (!eventArgs.ProcessingFailed) + { + var pubRecPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubRecPacket(eventArgs.PublishPacket, eventArgs.ReasonCode); + return SendAsync(pubRecPacket, eventArgs.CancellationToken); + } + } + else + { + throw new MqttProtocolViolationException("Received a not supported QoS level."); + } + + return PlatformAbstractionLayer.CompletedTask; } Task ProcessReceivedPubRecPacket(MqttPubRecPacket pubRecPacket, CancellationToken cancellationToken) @@ -775,10 +761,10 @@ namespace MQTTnet.Client return _adapter.PacketFormatterAdapter.DataConverter.CreateClientPublishResult(pubRecPacket, pubCompPacket); } - async Task HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket) + async Task HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken) { var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket); - var eventArgs = new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage); + var eventArgs = new MqttApplicationMessageReceivedEventArgs(this, publishPacket, cancellationToken, Options.ClientId, applicationMessage); var handler = ApplicationMessageReceivedHandler; if (handler != null) diff --git a/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs b/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs index bd80dc6..8e4f2f5 100644 --- a/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs +++ b/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs @@ -1,16 +1,41 @@ -using System; +using MQTTnet.Client; +using MQTTnet.Implementations; +using MQTTnet.Packets; +using System; +using System.Threading; using System.Threading.Tasks; namespace MQTTnet { public sealed class MqttApplicationMessageReceivedEventArgs : EventArgs { + public MqttApplicationMessageReceivedEventArgs(MqttClient client, MqttPublishPacket publishPacket, CancellationToken cancellationToken, string clientId, MqttApplicationMessage applicationMessage) + { + Client = client ?? throw new ArgumentNullException(nameof(client)); + PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket)); + CancellationToken = cancellationToken; + acknowledged = 0; + ClientId = clientId; + ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); + AutoAcknowledge = true; + } + public MqttApplicationMessageReceivedEventArgs(string clientId, MqttApplicationMessage applicationMessage) { + acknowledged = 1; ClientId = clientId; ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); + AutoAcknowledge = true; } + internal MqttClient Client { get; } + + internal CancellationToken CancellationToken { get; } + + internal MqttPublishPacket PublishPacket { get; } + + int acknowledged; + /// /// Gets the client identifier. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. @@ -31,6 +56,18 @@ namespace MQTTnet public object Tag { get; set; } - public Task PendingTask { get; set; } + public bool AutoAcknowledge { get; set; } + + public Task Acknowledge() + { + if (Interlocked.CompareExchange(ref acknowledged, 1, 0) == 0) + { + return Client.AcknowledgeReceivedPublishPacket(this); + } + else + { + return PlatformAbstractionLayer.CompletedTask; + } + } } } diff --git a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs index 36daed4..c28fdf9 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs @@ -465,7 +465,8 @@ namespace MQTTnet.Tests Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs) { var value = int.Parse(eventArgs.ApplicationMessage.ConvertPayloadToString()); - eventArgs.PendingTask = Task.Delay(value).ContinueWith(x => true); + eventArgs.AutoAcknowledge = false; + Task.Delay(value).ContinueWith(x => eventArgs.Acknowledge()); System.Diagnostics.Debug.WriteLine($"received {value}"); lock (receivedValues)