diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 0107ffb..47ac903 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -41,7 +41,7 @@ namespace MQTTnet.Client long _isDisconnectPending; bool _isConnected; MqttClientDisconnectReason _disconnectReason; - + DateTime _lastPacketSentTimestamp; public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger) @@ -390,7 +390,7 @@ namespace MQTTnet.Client cancellationToken.ThrowIfCancellationRequested(); _lastPacketSentTimestamp = DateTime.UtcNow; - + return _adapter.SendPacketAsync(packet, cancellationToken); } @@ -585,9 +585,9 @@ namespace MQTTnet.Client if (!_packetDispatcher.TryDispatch(packet)) { throw new MqttProtocolViolationException($"Received packet '{packet}' at an unexpected time."); - } } } + } catch (Exception exception) { if (_cleanDisconnectInitiated) @@ -641,35 +641,13 @@ namespace MQTTnet.Client } var publishPacket = publishPacketDequeueResult.Item; + var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket, cancellationToken); - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) - { - await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); - } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) - { - var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); - - if (!eventArgs.ProcessingFailed) - { - var pubAckPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubAckPacket(publishPacket, eventArgs.ReasonCode); - await SendAsync(pubAckPacket, cancellationToken).ConfigureAwait(false); - } - } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) + if (eventArgs.AutoAcknowledge) { - var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); - - if (!eventArgs.ProcessingFailed) - { - var pubRecPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubRecPacket(publishPacket, eventArgs.ReasonCode); - await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false); - } - } - else - { - throw new MqttProtocolViolationException("Received a not supported QoS level."); + await eventArgs.Acknowledge(); } + } catch (OperationCanceledException) { @@ -681,6 +659,36 @@ namespace MQTTnet.Client } } + internal Task AcknowledgeReceivedPublishPacket(MqttApplicationMessageReceivedEventArgs eventArgs) + { + if (eventArgs.PublishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) + { + // no response required + } + else if (eventArgs.PublishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) + { + if (!eventArgs.ProcessingFailed) + { + var pubAckPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubAckPacket(eventArgs.PublishPacket, eventArgs.ReasonCode); + return SendAsync(pubAckPacket, eventArgs.CancellationToken); + } + } + else if (eventArgs.PublishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) + { + if (!eventArgs.ProcessingFailed) + { + var pubRecPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubRecPacket(eventArgs.PublishPacket, eventArgs.ReasonCode); + return SendAsync(pubRecPacket, eventArgs.CancellationToken); + } + } + else + { + throw new MqttProtocolViolationException("Received a not supported QoS level."); + } + + return PlatformAbstractionLayer.CompletedTask; + } + Task ProcessReceivedPubRecPacket(MqttPubRecPacket pubRecPacket, CancellationToken cancellationToken) { if (!_packetDispatcher.TryDispatch(pubRecPacket)) @@ -706,7 +714,7 @@ namespace MQTTnet.Client // Also dispatch disconnect to waiting threads to generate a proper exception. _packetDispatcher.FailAll(new MqttUnexpectedDisconnectReceivedException(disconnectPacket)); - + if (!DisconnectIsPending()) { return DisconnectInternalAsync(_packetReceiverTask, null, null); @@ -753,10 +761,10 @@ namespace MQTTnet.Client return _adapter.PacketFormatterAdapter.DataConverter.CreateClientPublishResult(pubRecPacket, pubCompPacket); } - async Task HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket) + async Task HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken) { var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket); - var eventArgs = new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage); + var eventArgs = new MqttApplicationMessageReceivedEventArgs(this, publishPacket, cancellationToken, Options.ClientId, applicationMessage); var handler = ApplicationMessageReceivedHandler; if (handler != null) diff --git a/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs b/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs index c44f179..8e4f2f5 100644 --- a/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs +++ b/Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs @@ -1,15 +1,41 @@ -using System; +using MQTTnet.Client; +using MQTTnet.Implementations; +using MQTTnet.Packets; +using System; +using System.Threading; +using System.Threading.Tasks; namespace MQTTnet { public sealed class MqttApplicationMessageReceivedEventArgs : EventArgs { + public MqttApplicationMessageReceivedEventArgs(MqttClient client, MqttPublishPacket publishPacket, CancellationToken cancellationToken, string clientId, MqttApplicationMessage applicationMessage) + { + Client = client ?? throw new ArgumentNullException(nameof(client)); + PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket)); + CancellationToken = cancellationToken; + acknowledged = 0; + ClientId = clientId; + ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); + AutoAcknowledge = true; + } + public MqttApplicationMessageReceivedEventArgs(string clientId, MqttApplicationMessage applicationMessage) { + acknowledged = 1; ClientId = clientId; ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); + AutoAcknowledge = true; } + internal MqttClient Client { get; } + + internal CancellationToken CancellationToken { get; } + + internal MqttPublishPacket PublishPacket { get; } + + int acknowledged; + /// /// Gets the client identifier. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. @@ -29,5 +55,19 @@ namespace MQTTnet public bool IsHandled { get; set; } public object Tag { get; set; } + + public bool AutoAcknowledge { get; set; } + + public Task Acknowledge() + { + if (Interlocked.CompareExchange(ref acknowledged, 1, 0) == 0) + { + return Client.AcknowledgeReceivedPublishPacket(this); + } + else + { + return PlatformAbstractionLayer.CompletedTask; + } + } } } diff --git a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs index c4fdb22..c28fdf9 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs @@ -445,6 +445,55 @@ namespace MQTTnet.Tests } } + [TestMethod] + public async Task Preserve_Message_Order_With_Delayed_Acknowledgement() + { + // The messages are sent in reverse or to ensure that the delay in the handler + // needs longer for the first messages and later messages may be processed earlier (if there + // is an issue). + const int MessagesCount = 50; + + using (var testEnvironment = new TestEnvironment(TestContext)) + { + await testEnvironment.StartServerAsync(); + + var client1 = await testEnvironment.ConnectClientAsync(); + await client1.SubscribeAsync("x", MqttQualityOfServiceLevel.ExactlyOnce); + + var receivedValues = new List(); + + Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs) + { + var value = int.Parse(eventArgs.ApplicationMessage.ConvertPayloadToString()); + eventArgs.AutoAcknowledge = false; + Task.Delay(value).ContinueWith(x => eventArgs.Acknowledge()); + + System.Diagnostics.Debug.WriteLine($"received {value}"); + lock (receivedValues) + { + receivedValues.Add(value); + } + + return Task.CompletedTask; + } + + client1.UseApplicationMessageReceivedHandler(Handler1); + + var client2 = await testEnvironment.ConnectClientAsync(); + for (var i = MessagesCount; i > 0; i--) + { + await client2.PublishAsync("x", i.ToString(), MqttQualityOfServiceLevel.ExactlyOnce); + } + + await Task.Delay(5000); + + for (var i = MessagesCount; i > 0; i--) + { + Assert.AreEqual(i, receivedValues[MessagesCount - i]); + } + } + } + [TestMethod] public async Task Send_Reply_For_Any_Received_Message() {