Browse Source

Process messages async

This commit fixes issues which were caused by processing messages synchronously in the packet receiver loop. This blocked KeepAlive and Ack packets from being processed while a message was processed.

Fixes #648
Fixes #587
Fixes #829
release/3.x.x
PSanetra 4 years ago
parent
commit
fcd6b7ff52
4 changed files with 120 additions and 32 deletions
  1. +2
    -0
      Build/MQTTnet.nuspec
  2. +64
    -28
      Source/MQTTnet/Client/MqttClient.cs
  3. +9
    -3
      Source/MQTTnet/Internal/AsyncQueue.cs
  4. +45
    -1
      Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs

+ 2
- 0
Build/MQTTnet.nuspec View File

@@ -17,6 +17,8 @@
* [LowLevelMqttClient] Added low level MQTT client in order to provide more flexibility when using the MQTT protocol. This client requires detailed knowledge about the MQTT protocol. * [LowLevelMqttClient] Added low level MQTT client in order to provide more flexibility when using the MQTT protocol. This client requires detailed knowledge about the MQTT protocol.
* [Client] Improve connection stability (thanks to @jltjohanlindqvist). * [Client] Improve connection stability (thanks to @jltjohanlindqvist).
* [Client] Support WithConnectionUri to configure client (thanks to @PMExtra). * [Client] Support WithConnectionUri to configure client (thanks to @PMExtra).
* [Client] Support PublishAsync with QoS 1 and QoS 2 from within an ApplicationMessageReceivedHandler (#648, #587, thanks to @PSanetra).
* [Client] Fixed MqttCommunicationTimedOutExceptions, caused by a long running ApplicationMessageReceivedHandler, which blocked MQTT packets from being processed (#829, thanks to @PSanetra).
* [ManagedClient] Added builder class for MqttClientUnsubscribeOptions (thanks to @dominikviererbe). * [ManagedClient] Added builder class for MqttClientUnsubscribeOptions (thanks to @dominikviererbe).
* [ManagedClient] Added support for persisted sessions (thansk to @PMExtra). * [ManagedClient] Added support for persisted sessions (thansk to @PMExtra).
* [ManagedClient] Fixed a memory leak (thanks to @zawodskoj). * [ManagedClient] Fixed a memory leak (thanks to @zawodskoj).


+ 64
- 28
Source/MQTTnet/Client/MqttClient.cs View File

@@ -34,6 +34,9 @@ namespace MQTTnet.Client
private CancellationTokenSource _backgroundCancellationTokenSource; private CancellationTokenSource _backgroundCancellationTokenSource;
private Task _packetReceiverTask; private Task _packetReceiverTask;
private Task _keepAlivePacketsSenderTask; private Task _keepAlivePacketsSenderTask;
private Task _publishPacketReceiverTask;

private AsyncQueue<MqttPublishPacket> _publishPacketReceiverQueue;


private IMqttChannelAdapter _adapter; private IMqttChannelAdapter _adapter;
private bool _cleanDisconnectInitiated; private bool _cleanDisconnectInitiated;
@@ -88,6 +91,9 @@ namespace MQTTnet.Client
await _adapter.ConnectAsync(options.CommunicationTimeout, combined.Token).ConfigureAwait(false); await _adapter.ConnectAsync(options.CommunicationTimeout, combined.Token).ConfigureAwait(false);
_logger.Verbose("Connection with server established."); _logger.Verbose("Connection with server established.");


_publishPacketReceiverQueue = new AsyncQueue<MqttPublishPacket>();
_publishPacketReceiverTask = Task.Run(() => ProcessReceivedPublishPackets(backgroundCancellationToken), backgroundCancellationToken);

_packetReceiverTask = Task.Run(() => TryReceivePacketsAsync(backgroundCancellationToken), backgroundCancellationToken); _packetReceiverTask = Task.Run(() => TryReceivePacketsAsync(backgroundCancellationToken), backgroundCancellationToken);


authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, combined.Token).ConfigureAwait(false); authenticateResult = await AuthenticateAsync(adapter, options.WillMessage, combined.Token).ConfigureAwait(false);
@@ -230,6 +236,9 @@ namespace MQTTnet.Client
_backgroundCancellationTokenSource?.Dispose(); _backgroundCancellationTokenSource?.Dispose();
_backgroundCancellationTokenSource = null; _backgroundCancellationTokenSource = null;


_publishPacketReceiverQueue?.Dispose();
_publishPacketReceiverQueue = null;

_adapter?.Dispose(); _adapter?.Dispose();
_adapter = null; _adapter = null;
} }
@@ -300,9 +309,12 @@ namespace MQTTnet.Client
try try
{ {
var receiverTask = WaitForTaskAsync(_packetReceiverTask, sender); var receiverTask = WaitForTaskAsync(_packetReceiverTask, sender);
var publishPacketReceiverTask = WaitForTaskAsync(_publishPacketReceiverTask, sender);
var keepAliveTask = WaitForTaskAsync(_keepAlivePacketsSenderTask, sender); var keepAliveTask = WaitForTaskAsync(_keepAlivePacketsSenderTask, sender);


await Task.WhenAll(receiverTask, keepAliveTask).ConfigureAwait(false);
await Task.WhenAll(receiverTask, publishPacketReceiverTask, keepAliveTask).ConfigureAwait(false);

_publishPacketReceiverQueue.Dispose();
} }
catch (Exception e) catch (Exception e)
{ {
@@ -522,7 +534,7 @@ namespace MQTTnet.Client


if (packet is MqttPublishPacket publishPacket) if (packet is MqttPublishPacket publishPacket)
{ {
await TryProcessReceivedPublishPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false);
EnqueueReceivedPublishPacket(publishPacket);
} }
else if (packet is MqttPubRelPacket pubRelPacket) else if (packet is MqttPubRelPacket pubRelPacket)
{ {
@@ -584,47 +596,71 @@ namespace MQTTnet.Client
} }
} }


private async Task TryProcessReceivedPublishPacketAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
private void EnqueueReceivedPublishPacket(MqttPublishPacket publishPacket)
{ {
try try
{ {
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
_publishPacketReceiverQueue.Enqueue(publishPacket);
}
catch (Exception exception)
{
_logger.Error(exception, "Error while enqueueing application message.");
}
}

private async Task ProcessReceivedPublishPackets(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{ {
if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false))
var publishPacketDequeueResult = await _publishPacketReceiverQueue.TryDequeueAsync(cancellationToken);

if (!publishPacketDequeueResult.IsSuccess)
{
return;
}

var publishPacket = publishPacketDequeueResult.Item;

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{ {
await SendAsync(new MqttPubAckPacket
await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false))
{ {
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubAckReasonCode.Success
}, cancellationToken).ConfigureAwait(false);
await SendAsync(new MqttPubAckPacket
{
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubAckReasonCode.Success
}, cancellationToken).ConfigureAwait(false);
}
} }
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false))
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{ {
var pubRecPacket = new MqttPubRecPacket
if (await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false))
{ {
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubRecReasonCode.Success
};
var pubRecPacket = new MqttPubRecPacket
{
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubRecReasonCode.Success
};


await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false);
await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false);
}
}
else
{
throw new MqttProtocolViolationException("Received a not supported QoS level.");
} }
} }
else
catch (Exception exception)
{ {
throw new MqttProtocolViolationException("Received a not supported QoS level.");
_logger.Error(exception, "Error while handling application message.");
} }
} }
catch (Exception exception)
{
_logger.Error(exception, "Error while handling application message.");
}
} }


private async Task<MqttClientPublishResult> PublishAtMostOnce(MqttPublishPacket publishPacket, CancellationToken cancellationToken) private async Task<MqttClientPublishResult> PublishAtMostOnce(MqttPublishPacket publishPacket, CancellationToken cancellationToken)


Source/MQTTnet/Internal/AsyncBlockingQueue.cs → Source/MQTTnet/Internal/AsyncQueue.cs View File

@@ -23,9 +23,15 @@ namespace MQTTnet.Internal
{ {
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested();
try
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
}
catch (OperationCanceledException)
{
return new AsyncQueueDequeueResult<TItem>(false, default(TItem));
}


if (_queue.TryDequeue(out var item)) if (_queue.TryDequeue(out var item))
{ {

+ 45
- 1
Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs View File

@@ -12,6 +12,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Net.Sockets; using System.Net.Sockets;
using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;


@@ -434,6 +435,49 @@ namespace MQTTnet.Tests
} }
} }


[TestMethod]
public async Task Publish_QoS_1_In_ApplicationMessageReceiveHandler()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

const string client1Topic = "client1/topic";
const string client2Topic = "client2/topic";
const string expectedClient2Message = "hello client2";

var client1 = await testEnvironment.ConnectClientAsync();
client1.UseApplicationMessageReceivedHandler(async c =>
{
await client1.PublishAsync(client2Topic, expectedClient2Message, MqttQualityOfServiceLevel.AtLeastOnce);
});

await client1.SubscribeAsync(client1Topic, MqttQualityOfServiceLevel.AtLeastOnce);

var client2 = await testEnvironment.ConnectClientAsync();

var client2TopicResults = new List<string>();

client2.UseApplicationMessageReceivedHandler(c =>
{
client2TopicResults.Add(Encoding.UTF8.GetString(c.ApplicationMessage.Payload));
});

await client2.SubscribeAsync(client2Topic);

var client3 = await testEnvironment.ConnectClientAsync();
var message = new MqttApplicationMessageBuilder().WithTopic(client1Topic).Build();
await client3.PublishAsync(message);
await client3.PublishAsync(message);

await Task.Delay(500);

Assert.AreEqual(2, client2TopicResults.Count);
Assert.AreEqual(expectedClient2Message, client2TopicResults[0]);
Assert.AreEqual(expectedClient2Message, client2TopicResults[1]);
}
}

[TestMethod] [TestMethod]
public async Task Subscribe_In_Callback_Events() public async Task Subscribe_In_Callback_Events()
{ {
@@ -565,7 +609,7 @@ namespace MQTTnet.Tests


for (var i = 0; i < 98; i++) for (var i = 0; i < 98; i++)
{ {
Assert.IsFalse(clients[i].IsConnected);
Assert.IsFalse(clients[i].IsConnected, $"clients[{i}] is not connected");
} }


Assert.IsTrue(clients[99].IsConnected); Assert.IsTrue(clients[99].IsConnected);


Loading…
Cancel
Save