diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 9e25687..08d21ed 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -17,6 +17,8 @@ * [LowLevelMqttClient] Added low level MQTT client in order to provide more flexibility when using the MQTT protocol. This client requires detailed knowledge about the MQTT protocol. * [Client] Improve connection stability (thanks to @jltjohanlindqvist). * [Client] Support WithConnectionUri to configure client (thanks to @PMExtra). +* [Client] Support PublishAsync with QoS 1 and QoS 2 from within an ApplicationMessageReceivedHandler (#648, #587, thanks to @PSanetra). +* [Client] Fixed MqttCommunicationTimedOutExceptions, caused by a long running ApplicationMessageReceivedHandler, which blocked MQTT packets from being processed (#829, thanks to @PSanetra). * [ManagedClient] Added builder class for MqttClientUnsubscribeOptions (thanks to @dominikviererbe). * [ManagedClient] Added support for persisted sessions (thansk to @PMExtra). * [ManagedClient] Fixed a memory leak (thanks to @zawodskoj). diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 2546fc1..dfcff8a 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -34,6 +34,9 @@ namespace MQTTnet.Client private CancellationTokenSource _backgroundCancellationTokenSource; private Task _packetReceiverTask; private Task _keepAlivePacketsSenderTask; + private Task _publishPacketReceiverTask; + + private AsyncQueue _publishPacketReceiverQueue; private IMqttChannelAdapter _adapter; private bool _cleanDisconnectInitiated; @@ -88,6 +91,9 @@ namespace MQTTnet.Client await _adapter.ConnectAsync(options.CommunicationTimeout, combined.Token).ConfigureAwait(false); _logger.Verbose("Connection with server established."); + _publishPacketReceiverQueue = new AsyncQueue(); + _publishPacketReceiverTask = Task.Run(() => ProcessReceivedPublishPackets(backgroundCancellationToken), backgroundCancellationToken); + _packetReceiverTask = Task.Run(() => TryReceivePacketsAsync(backgroundCancellationToken), backgroundCancellationToken); authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, combined.Token).ConfigureAwait(false); @@ -230,6 +236,9 @@ namespace MQTTnet.Client _backgroundCancellationTokenSource?.Dispose(); _backgroundCancellationTokenSource = null; + _publishPacketReceiverQueue?.Dispose(); + _publishPacketReceiverQueue = null; + _adapter?.Dispose(); _adapter = null; } @@ -300,9 +309,12 @@ namespace MQTTnet.Client try { var receiverTask = WaitForTaskAsync(_packetReceiverTask, sender); + var publishPacketReceiverTask = WaitForTaskAsync(_publishPacketReceiverTask, sender); var keepAliveTask = WaitForTaskAsync(_keepAlivePacketsSenderTask, sender); - await Task.WhenAll(receiverTask, keepAliveTask).ConfigureAwait(false); + await Task.WhenAll(receiverTask, publishPacketReceiverTask, keepAliveTask).ConfigureAwait(false); + + _publishPacketReceiverQueue.Dispose(); } catch (Exception e) { @@ -522,7 +534,7 @@ namespace MQTTnet.Client if (packet is MqttPublishPacket publishPacket) { - await TryProcessReceivedPublishPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false); + EnqueueReceivedPublishPacket(publishPacket); } else if (packet is MqttPubRelPacket pubRelPacket) { @@ -584,47 +596,71 @@ namespace MQTTnet.Client } } - private async Task TryProcessReceivedPublishPacketAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken) + private void EnqueueReceivedPublishPacket(MqttPublishPacket publishPacket) { try { - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) - { - await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); - } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) + _publishPacketReceiverQueue.Enqueue(publishPacket); + } + catch (Exception exception) + { + _logger.Error(exception, "Error while enqueueing application message."); + } + } + + private async Task ProcessReceivedPublishPackets(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try { - if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false)) + var publishPacketDequeueResult = await _publishPacketReceiverQueue.TryDequeueAsync(cancellationToken); + + if (!publishPacketDequeueResult.IsSuccess) + { + return; + } + + var publishPacket = publishPacketDequeueResult.Item; + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { - await SendAsync(new MqttPubAckPacket + await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false); + } + else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) + { + if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false)) { - PacketIdentifier = publishPacket.PacketIdentifier, - ReasonCode = MqttPubAckReasonCode.Success - }, cancellationToken).ConfigureAwait(false); + await SendAsync(new MqttPubAckPacket + { + PacketIdentifier = publishPacket.PacketIdentifier, + ReasonCode = MqttPubAckReasonCode.Success + }, cancellationToken).ConfigureAwait(false); + } } - } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) - { - if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false)) + else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { - var pubRecPacket = new MqttPubRecPacket + if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false)) { - PacketIdentifier = publishPacket.PacketIdentifier, - ReasonCode = MqttPubRecReasonCode.Success - }; + var pubRecPacket = new MqttPubRecPacket + { + PacketIdentifier = publishPacket.PacketIdentifier, + ReasonCode = MqttPubRecReasonCode.Success + }; - await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false); + await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false); + } + } + else + { + throw new MqttProtocolViolationException("Received a not supported QoS level."); } } - else + catch (Exception exception) { - throw new MqttProtocolViolationException("Received a not supported QoS level."); + _logger.Error(exception, "Error while handling application message."); } } - catch (Exception exception) - { - _logger.Error(exception, "Error while handling application message."); - } } private async Task PublishAtMostOnce(MqttPublishPacket publishPacket, CancellationToken cancellationToken) diff --git a/Source/MQTTnet/Internal/AsyncBlockingQueue.cs b/Source/MQTTnet/Internal/AsyncQueue.cs similarity index 79% rename from Source/MQTTnet/Internal/AsyncBlockingQueue.cs rename to Source/MQTTnet/Internal/AsyncQueue.cs index 6cb80d2..43ad938 100644 --- a/Source/MQTTnet/Internal/AsyncBlockingQueue.cs +++ b/Source/MQTTnet/Internal/AsyncQueue.cs @@ -23,9 +23,15 @@ namespace MQTTnet.Internal { while (!cancellationToken.IsCancellationRequested) { - await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - - cancellationToken.ThrowIfCancellationRequested(); + try + { + await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + cancellationToken.ThrowIfCancellationRequested(); + } + catch (OperationCanceledException) + { + return new AsyncQueueDequeueResult(false, default(TItem)); + } if (_queue.TryDequeue(out var item)) { diff --git a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs index 2b86955..1a81512 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs @@ -12,6 +12,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Net.Sockets; +using System.Text; using System.Threading; using System.Threading.Tasks; @@ -434,6 +435,49 @@ namespace MQTTnet.Tests } } + [TestMethod] + public async Task Publish_QoS_1_In_ApplicationMessageReceiveHandler() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + await testEnvironment.StartServerAsync(); + + const string client1Topic = "client1/topic"; + const string client2Topic = "client2/topic"; + const string expectedClient2Message = "hello client2"; + + var client1 = await testEnvironment.ConnectClientAsync(); + client1.UseApplicationMessageReceivedHandler(async c => + { + await client1.PublishAsync(client2Topic, expectedClient2Message, MqttQualityOfServiceLevel.AtLeastOnce); + }); + + await client1.SubscribeAsync(client1Topic, MqttQualityOfServiceLevel.AtLeastOnce); + + var client2 = await testEnvironment.ConnectClientAsync(); + + var client2TopicResults = new List(); + + client2.UseApplicationMessageReceivedHandler(c => + { + client2TopicResults.Add(Encoding.UTF8.GetString(c.ApplicationMessage.Payload)); + }); + + await client2.SubscribeAsync(client2Topic); + + var client3 = await testEnvironment.ConnectClientAsync(); + var message = new MqttApplicationMessageBuilder().WithTopic(client1Topic).Build(); + await client3.PublishAsync(message); + await client3.PublishAsync(message); + + await Task.Delay(500); + + Assert.AreEqual(2, client2TopicResults.Count); + Assert.AreEqual(expectedClient2Message, client2TopicResults[0]); + Assert.AreEqual(expectedClient2Message, client2TopicResults[1]); + } + } + [TestMethod] public async Task Subscribe_In_Callback_Events() { @@ -565,7 +609,7 @@ namespace MQTTnet.Tests for (var i = 0; i < 98; i++) { - Assert.IsFalse(clients[i].IsConnected); + Assert.IsFalse(clients[i].IsConnected, $"clients[{i}] is not connected"); } Assert.IsTrue(clients[99].IsConnected);