浏览代码

Refactor deferred ACK of publish packets.

release/3.x.x
Christian 3 年前
父节点
当前提交
fbb5cd45a0
共有 5 个文件被更改,包括 57 次插入61 次删除
  1. +1
    -0
      Build/MQTTnet.nuspec
  2. +26
    -26
      Source/MQTTnet/Client/MqttClient.cs
  3. +28
    -33
      Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs
  4. +1
    -1
      Source/MQTTnet/Server/MqttServerEventDispatcher.cs
  5. +1
    -1
      Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs

+ 1
- 0
Build/MQTTnet.nuspec 查看文件

@@ -16,6 +16,7 @@
* [Client] Changed exception types so that proper exceptions are thrown when connecting (#1082).
* [Client] Exposed Dup flag in application messages.
* [Client] Improved keep alive message sending.
* [Client] Added support for deferred message approval (#1075, thanks to @tkurucsai).
* [RpcClient] Fixed an issue when using a custom application message reveived handler (#1093).
* [RpcClient] Fixed a race condition when using timeout and cancellation token at the same time (BREAKING CHANGE!).
* [Server] Improved keep alive message sending.


+ 26
- 26
Source/MQTTnet/Client/MqttClient.cs 查看文件

@@ -230,21 +230,21 @@ namespace MQTTnet.Client
switch (applicationMessage.QualityOfServiceLevel)
{
case MqttQualityOfServiceLevel.AtMostOnce:
{
return PublishAtMostOnce(publishPacket, cancellationToken);
}
{
return PublishAtMostOnce(publishPacket, cancellationToken);
}
case MqttQualityOfServiceLevel.AtLeastOnce:
{
return PublishAtLeastOnceAsync(publishPacket, cancellationToken);
}
{
return PublishAtLeastOnceAsync(publishPacket, cancellationToken);
}
case MqttQualityOfServiceLevel.ExactlyOnce:
{
return PublishExactlyOnceAsync(publishPacket, cancellationToken);
}
{
return PublishExactlyOnceAsync(publishPacket, cancellationToken);
}
default:
{
throw new NotSupportedException();
}
{
throw new NotSupportedException();
}
}
}

@@ -365,7 +365,8 @@ namespace MQTTnet.Client
{
// This handler must be executed in a new thread because otherwise a dead lock may happen
// when trying to reconnect in that handler etc.
Task.Run(() => disconnectedHandler.HandleDisconnectedAsync(new MqttClientDisconnectedEventArgs(clientWasConnected, exception, authenticateResult, _disconnectReason))).RunInBackground(_logger);
Task.Run(() => disconnectedHandler.HandleDisconnectedAsync(new MqttClientDisconnectedEventArgs(clientWasConnected, exception, authenticateResult, _disconnectReason)))
.RunInBackground(_logger);
}
}
}
@@ -585,9 +586,9 @@ namespace MQTTnet.Client
if (!_packetDispatcher.TryDispatch(packet))
{
throw new MqttProtocolViolationException($"Received packet '{packet}' at an unexpected time.");
}
}
}
}
catch (Exception exception)
{
if (_cleanDisconnectInitiated)
@@ -641,13 +642,12 @@ namespace MQTTnet.Client
}

var publishPacket = publishPacketDequeueResult.Item;
var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket, cancellationToken);
var eventArgs = await HandleReceivedApplicationMessageAsync(publishPacket).ConfigureAwait(false);

if (eventArgs.AutoAcknowledge)
if (eventArgs.AutoAcknowledge)
{
await eventArgs.Acknowledge();
await eventArgs.AcknowledgeAsync(cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
@@ -659,7 +659,7 @@ namespace MQTTnet.Client
}
}

internal Task AcknowledgeReceivedPublishPacket(MqttApplicationMessageReceivedEventArgs eventArgs)
internal Task AcknowledgeReceivedPublishPacket(MqttApplicationMessageReceivedEventArgs eventArgs, CancellationToken cancellationToken)
{
if (eventArgs.PublishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
@@ -670,7 +670,7 @@ namespace MQTTnet.Client
if (!eventArgs.ProcessingFailed)
{
var pubAckPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubAckPacket(eventArgs.PublishPacket, eventArgs.ReasonCode);
return SendAsync(pubAckPacket, eventArgs.CancellationToken);
return SendAsync(pubAckPacket, cancellationToken);
}
}
else if (eventArgs.PublishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
@@ -678,7 +678,7 @@ namespace MQTTnet.Client
if (!eventArgs.ProcessingFailed)
{
var pubRecPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubRecPacket(eventArgs.PublishPacket, eventArgs.ReasonCode);
return SendAsync(pubRecPacket, eventArgs.CancellationToken);
return SendAsync(pubRecPacket, cancellationToken);
}
}
else
@@ -710,7 +710,7 @@ namespace MQTTnet.Client

Task ProcessReceivedDisconnectPacket(MqttDisconnectPacket disconnectPacket)
{
_disconnectReason = (MqttClientDisconnectReason)(disconnectPacket.ReasonCode ?? MqttDisconnectReasonCode.NormalDisconnection);
_disconnectReason = (MqttClientDisconnectReason) (disconnectPacket.ReasonCode ?? MqttDisconnectReasonCode.NormalDisconnection);

// Also dispatch disconnect to waiting threads to generate a proper exception.
_packetDispatcher.FailAll(new MqttUnexpectedDisconnectReceivedException(disconnectPacket));
@@ -755,16 +755,16 @@ namespace MQTTnet.Client
var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket, cancellationToken).ConfigureAwait(false);

var pubRelPacket = _adapter.PacketFormatterAdapter.DataConverter.CreatePubRelPacket(pubRecPacket, MqttApplicationMessageReceivedReasonCode.Success);
var pubCompPacket = await SendAndReceiveAsync<MqttPubCompPacket>(pubRelPacket, cancellationToken).ConfigureAwait(false);

return _adapter.PacketFormatterAdapter.DataConverter.CreateClientPublishResult(pubRecPacket, pubCompPacket);
}

async Task<MqttApplicationMessageReceivedEventArgs> HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
async Task<MqttApplicationMessageReceivedEventArgs> HandleReceivedApplicationMessageAsync(MqttPublishPacket publishPacket)
{
var applicationMessage = _adapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket);
var eventArgs = new MqttApplicationMessageReceivedEventArgs(this, publishPacket, cancellationToken, Options.ClientId, applicationMessage);
var eventArgs = new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage, publishPacket, AcknowledgeReceivedPublishPacket);

var handler = ApplicationMessageReceivedHandler;
if (handler != null)
@@ -813,4 +813,4 @@ namespace MQTTnet.Client
return Interlocked.CompareExchange(ref _isDisconnectPending, 1, 0) != 0;
}
}
}
}

+ 28
- 33
Source/MQTTnet/MqttApplicationMessageReceivedEventArgs.cs 查看文件

@@ -1,41 +1,30 @@
using MQTTnet.Client;
using MQTTnet.Implementations;
using MQTTnet.Packets;
using System;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Packets;

namespace MQTTnet
{
public sealed class MqttApplicationMessageReceivedEventArgs : EventArgs
{
public MqttApplicationMessageReceivedEventArgs(MqttClient client, MqttPublishPacket publishPacket, CancellationToken cancellationToken, string clientId, MqttApplicationMessage applicationMessage)
{
Client = client ?? throw new ArgumentNullException(nameof(client));
PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket));
CancellationToken = cancellationToken;
acknowledged = 0;
ClientId = clientId;
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
AutoAcknowledge = true;
}

public MqttApplicationMessageReceivedEventArgs(string clientId, MqttApplicationMessage applicationMessage)
readonly Func<MqttApplicationMessageReceivedEventArgs, CancellationToken, Task> _acknowledgeHandler;
int _isAcknowledged;
public MqttApplicationMessageReceivedEventArgs(
string clientId,
MqttApplicationMessage applicationMessage,
MqttPublishPacket publishPacket,
Func<MqttApplicationMessageReceivedEventArgs, CancellationToken, Task> acknowledgeHandler)
{
acknowledged = 1;
ClientId = clientId;
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
AutoAcknowledge = true;
PublishPacket = publishPacket;
_acknowledgeHandler = acknowledgeHandler;
}

internal MqttClient Client { get; }

internal CancellationToken CancellationToken { get; }

internal MqttPublishPacket PublishPacket { get; }

int acknowledged;

/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
@@ -53,21 +42,27 @@ namespace MQTTnet
/// This value can be used in user code for custom control flow.
/// </summary>
public bool IsHandled { get; set; }

/// <summary>
/// Gets ir sets whether the library should send MQTT ACK packets automatically if required.
/// </summary>
public bool AutoAcknowledge { get; set; } = true;
public object Tag { get; set; }

public bool AutoAcknowledge { get; set; }

public Task Acknowledge()
{
if (Interlocked.CompareExchange(ref acknowledged, 1, 0) == 0)
public Task AcknowledgeAsync(CancellationToken cancellationToken)
{
if (_acknowledgeHandler == null)
{
return Client.AcknowledgeReceivedPublishPacket(this);
throw new NotSupportedException("Deferred acknowledgement of application message is not yet supported in MQTTnet server.");
}
else
if (Interlocked.CompareExchange(ref _isAcknowledged, 1, 0) == 0)
{
return PlatformAbstractionLayer.CompletedTask;
return _acknowledgeHandler(this, cancellationToken);
}
throw new InvalidOperationException("The application message is already acknowledged.");
}
}
}

+ 1
- 1
Source/MQTTnet/Server/MqttServerEventDispatcher.cs 查看文件

@@ -108,7 +108,7 @@ namespace MQTTnet.Server
return;
}

await handler.HandleApplicationMessageReceivedAsync(new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage)).ConfigureAwait(false);
await handler.HandleApplicationMessageReceivedAsync(new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage, null, null)).ConfigureAwait(false);
}
catch (Exception exception)
{


+ 1
- 1
Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs 查看文件

@@ -466,7 +466,7 @@ namespace MQTTnet.Tests
{
var value = int.Parse(eventArgs.ApplicationMessage.ConvertPayloadToString());
eventArgs.AutoAcknowledge = false;
Task.Delay(value).ContinueWith(x => eventArgs.Acknowledge());
Task.Delay(value).ContinueWith(x => eventArgs.AcknowledgeAsync(CancellationToken.None));

System.Diagnostics.Debug.WriteLine($"received {value}");
lock (receivedValues)


正在加载...
取消
保存