Selaa lähdekoodia

Merge pull request #1075 from tkurucsai/feature/pending-appmsg-acknowledge

Feature/pending appmsg acknowledge
release/3.x.x
Christian 3 vuotta sitten
committed by GitHub
vanhempi
commit
d430ca0250
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 muutettua tiedostoa jossa 130 lisäystä ja 33 poistoa
  1. +40
    -32
      Source/MQTTnet/Client/MqttClient.cs
  2. +41
    -1
      Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs
  3. +49
    -0
      Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs

+ 40
- 32
Source/MQTTnet/Client/MqttClient.cs Näytä tiedosto

@@ -41,7 +41,7 @@ namespace MQTTnet.Client
long _isDisconnectPending;
bool _isConnected;
MqttClientDisconnectReason _disconnectReason;
DateTime _lastPacketSentTimestamp;

public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger)
@@ -390,7 +390,7 @@ namespace MQTTnet.Client
cancellationToken.ThrowIfCancellationRequested();

_lastPacketSentTimestamp = DateTime.UtcNow;
return _adapter.SendPacketAsync(packet, cancellationToken);
}

@@ -585,9 +585,9 @@ namespace MQTTnet.Client
if (!_packetDispatcher.TryDispatch(packet))
{
throw new MqttProtocolViolationException($"Received packet '{packet}' at an unexpected time.");
}
}
}
}
catch (Exception exception)
{
if (_cleanDisconnectInitiated)
@@ -641,35 +641,13 @@ namespace MQTTnet.Client
}

var publishPacket = publishPacketDequeueResult.Item;
var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket, cancellationToken);

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false);

if (!eventArgs.ProcessingFailed)
{
var pubAckPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubAckPacket(publishPacket, eventArgs.ReasonCode);
await SendAsync(pubAckPacket, cancellationToken).ConfigureAwait(false);
}
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
if (eventArgs.AutoAcknowledge)
{
var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false);

if (!eventArgs.ProcessingFailed)
{
var pubRecPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubRecPacket(publishPacket, eventArgs.ReasonCode);
await SendAsync(pubRecPacket, cancellationToken).ConfigureAwait(false);
}
}
else
{
throw new MqttProtocolViolationException("Received a not supported QoS level.");
await eventArgs.Acknowledge();
}
}
catch (OperationCanceledException)
{
@@ -681,6 +659,36 @@ namespace MQTTnet.Client
}
}

internal Task AcknowledgeReceivedPublishPacket(MqttApplicationMessageReceivedEventArgs eventArgs)
{
if (eventArgs.PublishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
// no response required
}
else if (eventArgs.PublishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
if (!eventArgs.ProcessingFailed)
{
var pubAckPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubAckPacket(eventArgs.PublishPacket, eventArgs.ReasonCode);
return SendAsync(pubAckPacket, eventArgs.CancellationToken);
}
}
else if (eventArgs.PublishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
if (!eventArgs.ProcessingFailed)
{
var pubRecPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubRecPacket(eventArgs.PublishPacket, eventArgs.ReasonCode);
return SendAsync(pubRecPacket, eventArgs.CancellationToken);
}
}
else
{
throw new MqttProtocolViolationException("Received a not supported QoS level.");
}

return PlatformAbstractionLayer.CompletedTask;
}

Task ProcessReceivedPubRecPacket(MqttPubRecPacket pubRecPacket, CancellationToken cancellationToken)
{
if (!_packetDispatcher.TryDispatch(pubRecPacket))
@@ -706,7 +714,7 @@ namespace MQTTnet.Client

// Also dispatch disconnect to waiting threads to generate a proper exception.
_packetDispatcher.FailAll(new MqttUnexpectedDisconnectReceivedException(disconnectPacket));
if (!DisconnectIsPending())
{
return DisconnectInternalAsync(_packetReceiverTask, null, null);
@@ -753,10 +761,10 @@ namespace MQTTnet.Client
return _adapter.PacketFormatterAdapter.DataConverter.CreateClientPublishResult(pubRecPacket, pubCompPacket);
}

async Task<MqttApplicationMessageReceivedEventArgs> HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket)
async Task<MqttApplicationMessageReceivedEventArgs> HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
{
var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket);
var eventArgs = new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage);
var eventArgs = new MqttApplicationMessageReceivedEventArgs(this, publishPacket, cancellationToken, Options.ClientId, applicationMessage);

var handler = ApplicationMessageReceivedHandler;
if (handler != null)


+ 41
- 1
Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs Näytä tiedosto

@@ -1,15 +1,41 @@
using System;
using MQTTnet.Client;
using MQTTnet.Implementations;
using MQTTnet.Packets;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet
{
public sealed class MqttApplicationMessageReceivedEventArgs : EventArgs
{
public MqttApplicationMessageReceivedEventArgs(MqttClient client, MqttPublishPacket publishPacket, CancellationToken cancellationToken, string clientId, MqttApplicationMessage applicationMessage)
{
Client = client ?? throw new ArgumentNullException(nameof(client));
PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket));
CancellationToken = cancellationToken;
acknowledged = 0;
ClientId = clientId;
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
AutoAcknowledge = true;
}

public MqttApplicationMessageReceivedEventArgs(string clientId, MqttApplicationMessage applicationMessage)
{
acknowledged = 1;
ClientId = clientId;
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
AutoAcknowledge = true;
}

internal MqttClient Client { get; }

internal CancellationToken CancellationToken { get; }

internal MqttPublishPacket PublishPacket { get; }

int acknowledged;

/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
@@ -29,5 +55,19 @@ namespace MQTTnet
public bool IsHandled { get; set; }
public object Tag { get; set; }

public bool AutoAcknowledge { get; set; }

public Task Acknowledge()
{
if (Interlocked.CompareExchange(ref acknowledged, 1, 0) == 0)
{
return Client.AcknowledgeReceivedPublishPacket(this);
}
else
{
return PlatformAbstractionLayer.CompletedTask;
}
}
}
}

+ 49
- 0
Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs Näytä tiedosto

@@ -445,6 +445,55 @@ namespace MQTTnet.Tests
}
}

[TestMethod]
public async Task Preserve_Message_Order_With_Delayed_Acknowledgement()
{
// The messages are sent in reverse or to ensure that the delay in the handler
// needs longer for the first messages and later messages may be processed earlier (if there
// is an issue).
const int MessagesCount = 50;

using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync();

var client1 = await testEnvironment.ConnectClientAsync();
await client1.SubscribeAsync("x", MqttQualityOfServiceLevel.ExactlyOnce);

var receivedValues = new List<int>();

Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs)
{
var value = int.Parse(eventArgs.ApplicationMessage.ConvertPayloadToString());
eventArgs.AutoAcknowledge = false;
Task.Delay(value).ContinueWith(x => eventArgs.Acknowledge());

System.Diagnostics.Debug.WriteLine($"received {value}");
lock (receivedValues)
{
receivedValues.Add(value);
}

return Task.CompletedTask;
}

client1.UseApplicationMessageReceivedHandler(Handler1);

var client2 = await testEnvironment.ConnectClientAsync();
for (var i = MessagesCount; i > 0; i--)
{
await client2.PublishAsync("x", i.ToString(), MqttQualityOfServiceLevel.ExactlyOnce);
}

await Task.Delay(5000);

for (var i = MessagesCount; i > 0; i--)
{
Assert.AreEqual(i, receivedValues[MessagesCount - i]);
}
}
}

[TestMethod]
public async Task Send_Reply_For_Any_Received_Message()
{


Ladataan…
Peruuta
Tallenna