瀏覽代碼

Refactor package object model.

release/3.x.x
Christian Kratky 4 年之前
父節點
當前提交
b7fb0c1002
共有 71 個檔案被更改,包括 758 行新增579 行删除
  1. +5
    -5
      Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
  2. +1
    -3
      Source/MQTTnet/Adapter/IMqttChannelAdapter.cs
  3. +15
    -11
      Source/MQTTnet/Adapter/MqttChannelAdapter.cs
  4. +7
    -3
      Source/MQTTnet/Client/MqttClient.cs
  5. +7
    -1
      Source/MQTTnet/Formatter/IMqttDataConverter.cs
  6. +13
    -13
      Source/MQTTnet/Formatter/MqttPacketFormatterAdapter.cs
  7. +22
    -0
      Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs
  8. +19
    -19
      Source/MQTTnet/Formatter/V3/MqttV310PacketFormatter.cs
  9. +1
    -1
      Source/MQTTnet/Formatter/V3/MqttV311PacketFormatter.cs
  10. +21
    -0
      Source/MQTTnet/Formatter/V5/MqttV500DataConverter.cs
  11. +41
    -41
      Source/MQTTnet/Formatter/V5/MqttV500PacketEncoder.cs
  12. +9
    -10
      Source/MQTTnet/Formatter/V5/MqttV500PropertiesWriter.cs
  13. +7
    -2
      Source/MQTTnet/Implementations/MqttTcpChannel.cs
  14. +14
    -8
      Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs
  15. +6
    -8
      Source/MQTTnet/Implementations/MqttTcpServerListener.cs
  16. +14
    -9
      Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs
  17. +1
    -8
      Source/MQTTnet/Internal/Disposable.cs
  18. +1
    -1
      Source/MQTTnet/Internal/TestMqttChannel.cs
  19. +5
    -4
      Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs
  20. +2
    -2
      Source/MQTTnet/PacketDispatcher/MqttPacketDispatcher.cs
  21. +1
    -1
      Source/MQTTnet/Packets/IMqttPacketWithIdentifier.cs
  22. +1
    -1
      Source/MQTTnet/Packets/MqttAuthPacket.cs
  23. +1
    -1
      Source/MQTTnet/Packets/MqttAuthPacketProperties.cs
  24. +0
    -7
      Source/MQTTnet/Packets/MqttBasePublishPacket.cs
  25. +1
    -1
      Source/MQTTnet/Packets/MqttConnAckPacket.cs
  26. +1
    -1
      Source/MQTTnet/Packets/MqttConnAckPacketProperties.cs
  27. +1
    -1
      Source/MQTTnet/Packets/MqttConnectPacket.cs
  28. +1
    -1
      Source/MQTTnet/Packets/MqttConnectPacketProperties.cs
  29. +1
    -1
      Source/MQTTnet/Packets/MqttDisconnectPacket.cs
  30. +1
    -1
      Source/MQTTnet/Packets/MqttDisconnectPacketProperties.cs
  31. +4
    -1
      Source/MQTTnet/Packets/MqttPingReqPacket.cs
  32. +4
    -1
      Source/MQTTnet/Packets/MqttPingRespPacket.cs
  33. +3
    -1
      Source/MQTTnet/Packets/MqttPubAckPacket.cs
  34. +1
    -1
      Source/MQTTnet/Packets/MqttPubAckPacketProperties.cs
  35. +3
    -1
      Source/MQTTnet/Packets/MqttPubCompPacket.cs
  36. +1
    -1
      Source/MQTTnet/Packets/MqttPubCompPacketProperties.cs
  37. +3
    -1
      Source/MQTTnet/Packets/MqttPubRecPacket.cs
  38. +1
    -1
      Source/MQTTnet/Packets/MqttPubRecPacketProperties.cs
  39. +3
    -1
      Source/MQTTnet/Packets/MqttPubRelPacket.cs
  40. +1
    -1
      Source/MQTTnet/Packets/MqttPubRelPacketProperties.cs
  41. +3
    -1
      Source/MQTTnet/Packets/MqttPublishPacket.cs
  42. +1
    -1
      Source/MQTTnet/Packets/MqttPublishPacketProperties.cs
  43. +2
    -2
      Source/MQTTnet/Packets/MqttSubAckPacket.cs
  44. +1
    -1
      Source/MQTTnet/Packets/MqttSubAckPacketProperties.cs
  45. +2
    -2
      Source/MQTTnet/Packets/MqttSubscribePacket.cs
  46. +1
    -1
      Source/MQTTnet/Packets/MqttSubscribePacketProperties.cs
  47. +2
    -2
      Source/MQTTnet/Packets/MqttUnsubAckPacket.cs
  48. +1
    -1
      Source/MQTTnet/Packets/MqttUnsubAckPacketProperties.cs
  49. +2
    -2
      Source/MQTTnet/Packets/MqttUnsubscribePacket.cs
  50. +1
    -1
      Source/MQTTnet/Packets/MqttUnsubscribePacketProperties.cs
  51. +1
    -1
      Source/MQTTnet/Packets/MqttUserProperty.cs
  52. +1
    -0
      Source/MQTTnet/Server/IMqttServerOptions.cs
  53. +16
    -36
      Source/MQTTnet/Server/MqttClientConnection.cs
  54. +0
    -93
      Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs
  55. +1
    -1
      Source/MQTTnet/Server/MqttClientSession.cs
  56. +19
    -24
      Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs
  57. +6
    -0
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  58. +6
    -3
      Source/MQTTnet/Server/MqttClientSubscribeResult.cs
  59. +21
    -32
      Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs
  60. +115
    -0
      Source/MQTTnet/Server/MqttServerKeepAliveMonitor.cs
  61. +2
    -0
      Source/MQTTnet/Server/MqttServerOptions.cs
  62. +61
    -2
      Tests/MQTTnet.Core.Tests/AsyncLock_Tests.cs
  63. +47
    -3
      Tests/MQTTnet.Core.Tests/AsyncQueue_Tests.cs
  64. +39
    -4
      Tests/MQTTnet.Core.Tests/LowLevelMqttClient_Tests.cs
  65. +22
    -1
      Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs
  66. +2
    -3
      Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs
  67. +0
    -118
      Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs
  68. +78
    -0
      Tests/MQTTnet.Core.Tests/Server_Connection_Tests.cs
  69. +58
    -0
      Tests/MQTTnet.Core.Tests/Server_KeepAlive_Tests.cs
  70. +0
    -2
      Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs
  71. +3
    -67
      Tests/MQTTnet.Core.Tests/Server_Tests.cs

+ 5
- 5
Source/MQTTnet.AspnetCore/MqttConnectionContext.cs 查看文件

@@ -66,10 +66,10 @@ namespace MQTTnet.AspNetCore
public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }

public long BytesSent { get; set; }

public long BytesReceived { get; set; }

public Action ReadingPacketStartedCallback { get; set; }
public Action ReadingPacketCompletedCallback { get; set; }
public bool IsReadingPacket { get; private set; }

IHttpContextFeature Http => Connection.Features.Get<IHttpContextFeature>();

@@ -128,7 +128,7 @@ namespace MQTTnet.AspNetCore
else
{
// we did receive something but the message is not yet complete
ReadingPacketStartedCallback?.Invoke();
IsReadingPacket = true;
}
}
else if (readResult.IsCompleted)
@@ -147,14 +147,14 @@ namespace MQTTnet.AspNetCore
}
catch (Exception e)
{
// completing the cannels makes sure that there is no more data read after a protocol error
// completing the channel makes sure that there is no more data read after a protocol error
_input?.Complete(e);
_output?.Complete(e);
throw;
}
finally
{
ReadingPacketCompletedCallback?.Invoke();
IsReadingPacket = false;
}

cancellationToken.ThrowIfCancellationRequested();


+ 1
- 3
Source/MQTTnet/Adapter/IMqttChannelAdapter.cs 查看文件

@@ -21,9 +21,7 @@ namespace MQTTnet.Adapter

long BytesReceived { get; }

Action ReadingPacketStartedCallback { get; set; }

Action ReadingPacketCompletedCallback { get; set; }
bool IsReadingPacket { get; }

Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken);



+ 15
- 11
Source/MQTTnet/Adapter/MqttChannelAdapter.cs 查看文件

@@ -51,10 +51,10 @@ namespace MQTTnet.Adapter
public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }

public long BytesSent => Interlocked.Read(ref _bytesSent);

public long BytesReceived => Interlocked.Read(ref _bytesReceived);

public Action ReadingPacketStartedCallback { get; set; }
public Action ReadingPacketCompletedCallback { get; set; }
public bool IsReadingPacket { get; private set; }

public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
@@ -79,7 +79,7 @@ namespace MQTTnet.Adapter
throw;
}

WrapException(exception);
WrapAndThrowException(exception);
}
}

@@ -107,7 +107,7 @@ namespace MQTTnet.Adapter
throw;
}

WrapException(exception);
WrapAndThrowException(exception);
}
}

@@ -148,7 +148,7 @@ namespace MQTTnet.Adapter
throw;
}

WrapException(exception);
WrapAndThrowException(exception);
}
finally
{
@@ -214,7 +214,7 @@ namespace MQTTnet.Adapter
throw;
}

WrapException(exception);
WrapAndThrowException(exception);
}

return null;
@@ -253,7 +253,7 @@ namespace MQTTnet.Adapter
return null;
}

ReadingPacketStartedCallback?.Invoke();
IsReadingPacket = true;

var fixedHeader = readFixedHeaderResult.FixedHeader;
if (fixedHeader.RemainingLength == 0)
@@ -293,7 +293,7 @@ namespace MQTTnet.Adapter
}
finally
{
ReadingPacketCompletedCallback?.Invoke();
IsReadingPacket = false;
}
}

@@ -304,7 +304,7 @@ namespace MQTTnet.Adapter
exception is MqttCommunicationException;
}

static void WrapException(Exception exception)
static void WrapAndThrowException(Exception exception)
{
if (exception is IOException && exception.InnerException is SocketException innerException)
{
@@ -313,11 +313,15 @@ namespace MQTTnet.Adapter

if (exception is SocketException socketException)
{
if (socketException.SocketErrorCode == SocketError.ConnectionAborted ||
socketException.SocketErrorCode == SocketError.OperationAborted)
if (socketException.SocketErrorCode == SocketError.OperationAborted)
{
throw new OperationCanceledException();
}
if (socketException.SocketErrorCode == SocketError.ConnectionAborted)
{
throw new MqttCommunicationException(socketException);
}
}

if (exception is COMException comException)


+ 7
- 3
Source/MQTTnet/Client/MqttClient.cs 查看文件

@@ -395,9 +395,9 @@ namespace MQTTnet.Client
cancellationToken.ThrowIfCancellationRequested();

ushort identifier = 0;
if (requestPacket is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue)
if (requestPacket is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier > 0)
{
identifier = packetWithIdentifier.PacketIdentifier.Value;
identifier = packetWithIdentifier.PacketIdentifier;
}

using (var packetAwaiter = _packetDispatcher.AddAwaiter<TResponsePacket>(identifier))
@@ -636,7 +636,8 @@ namespace MQTTnet.Client
{
try
{
var publishPacketDequeueResult = await _publishPacketReceiverQueue.TryDequeueAsync(cancellationToken);
var publishPacketDequeueResult =
await _publishPacketReceiverQueue.TryDequeueAsync(cancellationToken);
if (!publishPacketDequeueResult.IsSuccess)
{
return;
@@ -677,6 +678,9 @@ namespace MQTTnet.Client
throw new MqttProtocolViolationException("Received a not supported QoS level.");
}
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.Error(exception, "Error while handling application message.");


+ 7
- 1
Source/MQTTnet/Formatter/IMqttDataConverter.cs 查看文件

@@ -1,10 +1,12 @@
using MQTTnet.Client.Connecting;
using System.Collections.Generic;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Publishing;
using MQTTnet.Client.Subscribing;
using MQTTnet.Client.Unsubscribing;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
using MqttClientSubscribeResult = MQTTnet.Client.Subscribing.MqttClientSubscribeResult;

@@ -30,8 +32,12 @@ namespace MQTTnet.Formatter

MqttSubscribePacket CreateSubscribePacket(MqttClientSubscribeOptions options);

MqttSubAckPacket CreateSubAckPacket(MqttSubscribePacket subscribePacket, Server.MqttClientSubscribeResult subscribeResult);

MqttUnsubscribePacket CreateUnsubscribePacket(MqttClientUnsubscribeOptions options);

MqttUnsubAckPacket CreateUnsubAckPacket(MqttUnsubscribePacket unsubscribePacket, List<MqttUnsubscribeReasonCode> reasonCodes);

MqttDisconnectPacket CreateDisconnectPacket(MqttClientDisconnectOptions options);

MqttClientPublishResult CreatePublishResult(MqttPubAckPacket pubAckPacket);


+ 13
- 13
Source/MQTTnet/Formatter/MqttPacketFormatterAdapter.cs 查看文件

@@ -8,7 +8,7 @@ using MQTTnet.Packets;

namespace MQTTnet.Formatter
{
public class MqttPacketFormatterAdapter
public sealed class MqttPacketFormatterAdapter
{
IMqttPacketFormatter _formatter;
@@ -77,17 +77,6 @@ namespace MQTTnet.Formatter
UseProtocolVersion(protocolVersion);
}

private void UseProtocolVersion(MqttProtocolVersion protocolVersion)
{
if (protocolVersion == MqttProtocolVersion.Unknown)
{
throw new InvalidOperationException("MQTT protocol version is invalid.");
}

ProtocolVersion = protocolVersion;
_formatter = GetMqttPacketFormatter(protocolVersion, Writer);
}

public static IMqttPacketFormatter GetMqttPacketFormatter(MqttProtocolVersion protocolVersion, IMqttPacketWriter writer)
{
if (protocolVersion == MqttProtocolVersion.Unknown)
@@ -116,7 +105,18 @@ namespace MQTTnet.Formatter
}
}

MqttProtocolVersion ParseProtocolVersion(ReceivedMqttPacket receivedMqttPacket)
void UseProtocolVersion(MqttProtocolVersion protocolVersion)
{
if (protocolVersion == MqttProtocolVersion.Unknown)
{
throw new InvalidOperationException("MQTT protocol version is invalid.");
}

ProtocolVersion = protocolVersion;
_formatter = GetMqttPacketFormatter(protocolVersion, Writer);
}

static MqttProtocolVersion ParseProtocolVersion(ReceivedMqttPacket receivedMqttPacket)
{
if (receivedMqttPacket == null) throw new ArgumentNullException(nameof(receivedMqttPacket));



+ 22
- 0
Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs 查看文件

@@ -9,6 +9,7 @@ using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using MqttClientSubscribeResult = MQTTnet.Client.Subscribing.MqttClientSubscribeResult;

@@ -172,6 +173,18 @@ namespace MQTTnet.Formatter.V3
return subscribePacket;
}

public MqttSubAckPacket CreateSubAckPacket(MqttSubscribePacket subscribePacket, Server.MqttClientSubscribeResult subscribeResult)
{
var subackPacket = new MqttSubAckPacket
{
PacketIdentifier = subscribePacket.PacketIdentifier
};

subackPacket.ReturnCodes.AddRange(subscribeResult.ReturnCodes);

return subackPacket;
}

public MqttUnsubscribePacket CreateUnsubscribePacket(MqttClientUnsubscribeOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
@@ -182,6 +195,15 @@ namespace MQTTnet.Formatter.V3
return unsubscribePacket;
}

public MqttUnsubAckPacket CreateUnsubAckPacket(MqttUnsubscribePacket unsubscribePacket, List<MqttUnsubscribeReasonCode> reasonCodes)
{
return new MqttUnsubAckPacket
{
PacketIdentifier = unsubscribePacket.PacketIdentifier,
ReasonCodes = reasonCodes
};
}

public MqttDisconnectPacket CreateDisconnectPacket(MqttClientDisconnectOptions options)
{
if (options.ReasonCode != MqttClientDisconnectReason.NormalDisconnection || options.ReasonString != null)


+ 19
- 19
Source/MQTTnet/Formatter/V3/MqttV310PacketFormatter.cs 查看文件

@@ -210,7 +210,7 @@ namespace MQTTnet.Formatter.V3

var topic = receivedMqttPacket.Body.ReadStringWithLengthPrefix();

ushort? packetIdentifier = null;
ushort packetIdentifier = 0;
if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
packetIdentifier = receivedMqttPacket.Body.ReadTwoByteInteger();
@@ -414,12 +414,12 @@ namespace MQTTnet.Formatter.V3

static byte EncodePubRelPacket(MqttPubRelPacket packet, IMqttPacketWriter packetWriter)
{
if (!packet.PacketIdentifier.HasValue)
if (packet.PacketIdentifier == 0)
{
throw new MqttProtocolViolationException("PubRel packet has no packet identifier.");
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02);
}
@@ -432,12 +432,12 @@ namespace MQTTnet.Formatter.V3

if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
if (!packet.PacketIdentifier.HasValue)
if (packet.PacketIdentifier == 0)
{
throw new MqttProtocolViolationException("Publish packet has no packet identifier.");
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);
}
else
{
@@ -471,36 +471,36 @@ namespace MQTTnet.Formatter.V3

static byte EncodePubAckPacket(MqttPubAckPacket packet, IMqttPacketWriter packetWriter)
{
if (!packet.PacketIdentifier.HasValue)
if (packet.PacketIdentifier == 0)
{
throw new MqttProtocolViolationException("PubAck packet has no packet identifier.");
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck);
}

static byte EncodePubRecPacket(MqttPubRecPacket packet, IMqttPacketWriter packetWriter)
{
if (!packet.PacketIdentifier.HasValue)
if (packet.PacketIdentifier == 0)
{
throw new MqttProtocolViolationException("PubRec packet has no packet identifier.");
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec);
}

static byte EncodePubCompPacket(MqttPubCompPacket packet, IMqttPacketWriter packetWriter)
{
if (!packet.PacketIdentifier.HasValue)
if (packet.PacketIdentifier == 0)
{
throw new MqttProtocolViolationException("PubComp packet has no packet identifier.");
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp);
}
@@ -509,12 +509,12 @@ namespace MQTTnet.Formatter.V3
{
if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");

if (!packet.PacketIdentifier.HasValue)
if (packet.PacketIdentifier == 0)
{
throw new MqttProtocolViolationException("Subscribe packet has no packet identifier.");
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);

if (packet.TopicFilters?.Count > 0)
{
@@ -530,12 +530,12 @@ namespace MQTTnet.Formatter.V3

static byte EncodeSubAckPacket(MqttSubAckPacket packet, IMqttPacketWriter packetWriter)
{
if (!packet.PacketIdentifier.HasValue)
if (packet.PacketIdentifier == 0)
{
throw new MqttProtocolViolationException("SubAck packet has no packet identifier.");
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);

if (packet.ReturnCodes?.Any() == true)
{
@@ -552,12 +552,12 @@ namespace MQTTnet.Formatter.V3
{
if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");

if (!packet.PacketIdentifier.HasValue)
if (packet.PacketIdentifier == 0)
{
throw new MqttProtocolViolationException("Unsubscribe packet has no packet identifier.");
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);

if (packet.TopicFilters?.Any() == true)
{
@@ -572,12 +572,12 @@ namespace MQTTnet.Formatter.V3

static byte EncodeUnsubAckPacket(MqttUnsubAckPacket packet, IMqttPacketWriter packetWriter)
{
if (!packet.PacketIdentifier.HasValue)
if (packet.PacketIdentifier == 0)
{
throw new MqttProtocolViolationException("UnsubAck packet has no packet identifier.");
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck);
}



+ 1
- 1
Source/MQTTnet/Formatter/V3/MqttV311PacketFormatter.cs 查看文件

@@ -4,7 +4,7 @@ using MQTTnet.Protocol;

namespace MQTTnet.Formatter.V3
{
public class MqttV311PacketFormatter : MqttV310PacketFormatter
public sealed class MqttV311PacketFormatter : MqttV310PacketFormatter
{
public MqttV311PacketFormatter(IMqttPacketWriter packetWriter)
: base(packetWriter)


+ 21
- 0
Source/MQTTnet/Formatter/V5/MqttV500DataConverter.cs 查看文件

@@ -200,6 +200,18 @@ namespace MQTTnet.Formatter.V5
return packet;
}

public MqttSubAckPacket CreateSubAckPacket(MqttSubscribePacket subscribePacket, Server.MqttClientSubscribeResult subscribeResult)
{
var subackPacket = new MqttSubAckPacket
{
PacketIdentifier = subscribePacket.PacketIdentifier
};

subackPacket.ReasonCodes.AddRange(subscribeResult.ReasonCodes);

return subackPacket;
}

public MqttUnsubscribePacket CreateUnsubscribePacket(MqttClientUnsubscribeOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
@@ -215,6 +227,15 @@ namespace MQTTnet.Formatter.V5
return packet;
}

public MqttUnsubAckPacket CreateUnsubAckPacket(MqttUnsubscribePacket unsubscribePacket, List<MqttUnsubscribeReasonCode> reasonCodes)
{
return new MqttUnsubAckPacket
{
PacketIdentifier = unsubscribePacket.PacketIdentifier,
ReasonCodes = reasonCodes
};
}

public MqttDisconnectPacket CreateDisconnectPacket(MqttClientDisconnectOptions options)
{
var packet = new MqttDisconnectPacket();


+ 41
- 41
Source/MQTTnet/Formatter/V5/MqttV500PacketEncoder.cs 查看文件

@@ -6,14 +6,13 @@ using MQTTnet.Protocol;

namespace MQTTnet.Formatter.V5
{
public class MqttV500PacketEncoder
public sealed class MqttV500PacketEncoder
{
private readonly IMqttPacketWriter _packetWriter;
readonly IMqttPacketWriter _packetWriter;

public MqttV500PacketEncoder()
: this(new MqttPacketWriter())
{

}

public MqttV500PacketEncoder(IMqttPacketWriter packetWriter)
@@ -21,7 +20,6 @@ namespace MQTTnet.Formatter.V5
_packetWriter = packetWriter;
}


public ArraySegment<byte> Encode(MqttBasePacket packet)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));
@@ -52,7 +50,7 @@ namespace MQTTnet.Formatter.V5
_packetWriter.FreeBuffer();
}

private static byte EncodePacket(MqttBasePacket packet, IMqttPacketWriter packetWriter)
static byte EncodePacket(MqttBasePacket packet, IMqttPacketWriter packetWriter)
{
switch (packet)
{
@@ -76,7 +74,7 @@ namespace MQTTnet.Formatter.V5
}
}

private static byte EncodeConnectPacket(MqttConnectPacket packet, IMqttPacketWriter packetWriter)
static byte EncodeConnectPacket(MqttConnectPacket packet, IMqttPacketWriter packetWriter)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));
if (packetWriter == null) throw new ArgumentNullException(nameof(packetWriter));
@@ -176,7 +174,7 @@ namespace MQTTnet.Formatter.V5
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Connect);
}

private static byte EncodeConnAckPacket(MqttConnAckPacket packet, IMqttPacketWriter packetWriter)
static byte EncodeConnAckPacket(MqttConnAckPacket packet, IMqttPacketWriter packetWriter)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));
if (packetWriter == null) throw new ArgumentNullException(nameof(packetWriter));
@@ -221,7 +219,7 @@ namespace MQTTnet.Formatter.V5
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.ConnAck);
}

private static byte EncodePublishPacket(MqttPublishPacket packet, IMqttPacketWriter packetWriter)
static byte EncodePublishPacket(MqttPublishPacket packet, IMqttPacketWriter packetWriter)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));
if (packetWriter == null) throw new ArgumentNullException(nameof(packetWriter));
@@ -235,12 +233,12 @@ namespace MQTTnet.Formatter.V5

if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
if (!packet.PacketIdentifier.HasValue)
if (packet.PacketIdentifier == 0)
{
throw new MqttProtocolViolationException("Publish packet has no packet identifier.");
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);
}
else
{
@@ -287,12 +285,12 @@ namespace MQTTnet.Formatter.V5
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Publish, fixedHeader);
}

private static byte EncodePubAckPacket(MqttPubAckPacket packet, IMqttPacketWriter packetWriter)
static byte EncodePubAckPacket(MqttPubAckPacket packet, IMqttPacketWriter packetWriter)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));
if (packetWriter == null) throw new ArgumentNullException(nameof(packetWriter));

if (!packet.PacketIdentifier.HasValue)
if (packet.PacketIdentifier == 0)
{
throw new MqttProtocolViolationException("PubAck packet has no packet identifier.");
}
@@ -302,7 +300,7 @@ namespace MQTTnet.Formatter.V5
throw new MqttProtocolViolationException("PubAck packet must contain a reason code.");
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);
var propertiesWriter = new MqttV500PropertiesWriter();
if (packet.Properties != null)
@@ -320,9 +318,9 @@ namespace MQTTnet.Formatter.V5
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck);
}

private static byte EncodePubRecPacket(MqttPubRecPacket packet, IMqttPacketWriter packetWriter)
static byte EncodePubRecPacket(MqttPubRecPacket packet, IMqttPacketWriter packetWriter)
{
ThrowIfPacketIdentifierIsInvalid(packet);
ThrowIfPacketIdentifierIsInvalid(packet.PacketIdentifier, packet);

if (!packet.ReasonCode.HasValue)
{
@@ -336,7 +334,7 @@ namespace MQTTnet.Formatter.V5
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);

if (packetWriter.Length > 0 || packet.ReasonCode.Value != MqttPubRecReasonCode.Success)
{
@@ -347,9 +345,9 @@ namespace MQTTnet.Formatter.V5
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec);
}

private static byte EncodePubRelPacket(MqttPubRelPacket packet, IMqttPacketWriter packetWriter)
static byte EncodePubRelPacket(MqttPubRelPacket packet, IMqttPacketWriter packetWriter)
{
ThrowIfPacketIdentifierIsInvalid(packet);
ThrowIfPacketIdentifierIsInvalid(packet.PacketIdentifier, packet);

if (!packet.ReasonCode.HasValue)
{
@@ -363,7 +361,7 @@ namespace MQTTnet.Formatter.V5
propertiesWriter.WriteUserProperties(packet.Properties.UserProperties);
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);
if (propertiesWriter.Length > 0 || packet.ReasonCode.Value != MqttPubRelReasonCode.Success)
{
@@ -374,16 +372,16 @@ namespace MQTTnet.Formatter.V5
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02);
}

private static byte EncodePubCompPacket(MqttPubCompPacket packet, IMqttPacketWriter packetWriter)
static byte EncodePubCompPacket(MqttPubCompPacket packet, IMqttPacketWriter packetWriter)
{
ThrowIfPacketIdentifierIsInvalid(packet);
ThrowIfPacketIdentifierIsInvalid(packet.PacketIdentifier, packet);

if (!packet.ReasonCode.HasValue)
{
ThrowReasonCodeNotSetException();
}

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);
var propertiesWriter = new MqttV500PropertiesWriter();
if (packet.Properties != null)
@@ -401,13 +399,13 @@ namespace MQTTnet.Formatter.V5
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp);
}

private static byte EncodeSubscribePacket(MqttSubscribePacket packet, IMqttPacketWriter packetWriter)
static byte EncodeSubscribePacket(MqttSubscribePacket packet, IMqttPacketWriter packetWriter)
{
if (packet.TopicFilters?.Any() != true) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");

ThrowIfPacketIdentifierIsInvalid(packet);
ThrowIfPacketIdentifierIsInvalid(packet.PacketIdentifier, packet);

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);

var propertiesWriter = new MqttV500PropertiesWriter();
if (packet.Properties != null)
@@ -448,13 +446,13 @@ namespace MQTTnet.Formatter.V5
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Subscribe, 0x02);
}

private static byte EncodeSubAckPacket(MqttSubAckPacket packet, IMqttPacketWriter packetWriter)
static byte EncodeSubAckPacket(MqttSubAckPacket packet, IMqttPacketWriter packetWriter)
{
if (packet.ReasonCodes?.Any() != true) throw new MqttProtocolViolationException("At least one reason code must be set[MQTT - 3.8.3 - 3].");

ThrowIfPacketIdentifierIsInvalid(packet);
ThrowIfPacketIdentifierIsInvalid(packet.PacketIdentifier, packet);

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);

var propertiesWriter = new MqttV500PropertiesWriter();
if (packet.Properties != null)
@@ -473,13 +471,13 @@ namespace MQTTnet.Formatter.V5
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.SubAck);
}

private static byte EncodeUnsubscribePacket(MqttUnsubscribePacket packet, IMqttPacketWriter packetWriter)
static byte EncodeUnsubscribePacket(MqttUnsubscribePacket packet, IMqttPacketWriter packetWriter)
{
if (packet.TopicFilters?.Any() != true) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");

ThrowIfPacketIdentifierIsInvalid(packet);
ThrowIfPacketIdentifierIsInvalid(packet.PacketIdentifier, packet);

packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);

var propertiesWriter = new MqttV500PropertiesWriter();
if (packet.Properties != null)
@@ -497,13 +495,13 @@ namespace MQTTnet.Formatter.V5
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
}

private static byte EncodeUnsubAckPacket(MqttUnsubAckPacket packet, IMqttPacketWriter packetWriter)
static byte EncodeUnsubAckPacket(MqttUnsubAckPacket packet, IMqttPacketWriter packetWriter)
{
if (packet.ReasonCodes?.Any() != true) throw new MqttProtocolViolationException("At least one reason code must be set[MQTT - 3.8.3 - 3].");

ThrowIfPacketIdentifierIsInvalid(packet);
ThrowIfPacketIdentifierIsInvalid(packet.PacketIdentifier, packet);
packetWriter.Write(packet.PacketIdentifier.Value);
packetWriter.Write(packet.PacketIdentifier);

var propertiesWriter = new MqttV500PropertiesWriter();
if (packet.Properties != null)
@@ -522,7 +520,7 @@ namespace MQTTnet.Formatter.V5
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck);
}

private static byte EncodeDisconnectPacket(MqttDisconnectPacket packet, IMqttPacketWriter packetWriter)
static byte EncodeDisconnectPacket(MqttDisconnectPacket packet, IMqttPacketWriter packetWriter)
{
if (!packet.ReasonCode.HasValue)
{
@@ -545,17 +543,17 @@ namespace MQTTnet.Formatter.V5
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Disconnect);
}

private static byte EncodePingReqPacket()
static byte EncodePingReqPacket()
{
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PingReq);
}

private static byte EncodePingRespPacket()
static byte EncodePingRespPacket()
{
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PingResp);
}

private static byte EncodeAuthPacket(MqttAuthPacket packet, IMqttPacketWriter packetWriter)
static byte EncodeAuthPacket(MqttAuthPacket packet, IMqttPacketWriter packetWriter)
{
packetWriter.Write((byte)packet.ReasonCode);

@@ -573,14 +571,16 @@ namespace MQTTnet.Formatter.V5
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Auth);
}

private static void ThrowReasonCodeNotSetException()
static void ThrowReasonCodeNotSetException()
{
throw new MqttProtocolViolationException("The ReasonCode must be set for MQTT version 5.");
}

private static void ThrowIfPacketIdentifierIsInvalid(IMqttPacketWithIdentifier packet)
static void ThrowIfPacketIdentifierIsInvalid(ushort packetIdentifier, MqttBasePacket packet)
{
if (!packet.PacketIdentifier.HasValue)
// SUBSCRIBE, UNSUBSCRIBE, and PUBLISH(in cases where QoS > 0) Control Packets MUST contain a non-zero 16 - bit Packet Identifier[MQTT - 2.3.1 - 1].

if (packetIdentifier == 0)
{
throw new MqttProtocolViolationException($"Packet identifier is not set for {packet.GetType().Name}.");
}


+ 9
- 10
Source/MQTTnet/Formatter/V5/MqttV500PropertiesWriter.cs 查看文件

@@ -5,10 +5,9 @@ using MQTTnet.Protocol;

namespace MQTTnet.Formatter.V5
{
public class MqttV500PropertiesWriter
public sealed class MqttV500PropertiesWriter
{
// TODO: Consider lazy init on first write to avoid useless allocations.
private readonly MqttPacketWriter _packetWriter = new MqttPacketWriter();
readonly MqttPacketWriter _packetWriter = new MqttPacketWriter();

public int Length => _packetWriter.Length;

@@ -178,7 +177,7 @@ namespace MQTTnet.Formatter.V5
Write(MqttPropertyId.ResponseInformation, value);
}

private void Write(MqttPropertyId id, bool? value)
void Write(MqttPropertyId id, bool? value)
{
if (!value.HasValue)
{
@@ -189,7 +188,7 @@ namespace MQTTnet.Formatter.V5
_packetWriter.Write(value.Value ? (byte)0x1 : (byte)0x0);
}

private void Write(MqttPropertyId id, byte? value)
void Write(MqttPropertyId id, byte? value)
{
if (!value.HasValue)
{
@@ -200,7 +199,7 @@ namespace MQTTnet.Formatter.V5
_packetWriter.Write(value.Value);
}

private void Write(MqttPropertyId id, ushort? value)
void Write(MqttPropertyId id, ushort? value)
{
if (!value.HasValue)
{
@@ -211,7 +210,7 @@ namespace MQTTnet.Formatter.V5
_packetWriter.Write(value.Value);
}

private void WriteAsVariableLengthInteger(MqttPropertyId id, uint? value)
void WriteAsVariableLengthInteger(MqttPropertyId id, uint? value)
{
if (!value.HasValue)
{
@@ -222,7 +221,7 @@ namespace MQTTnet.Formatter.V5
_packetWriter.WriteVariableLengthInteger(value.Value);
}

private void WriteAsFourByteInteger(MqttPropertyId id, uint? value)
void WriteAsFourByteInteger(MqttPropertyId id, uint? value)
{
if (!value.HasValue)
{
@@ -236,7 +235,7 @@ namespace MQTTnet.Formatter.V5
_packetWriter.Write((byte)value.Value);
}

private void Write(MqttPropertyId id, string value)
void Write(MqttPropertyId id, string value)
{
if (value == null)
{
@@ -247,7 +246,7 @@ namespace MQTTnet.Formatter.V5
_packetWriter.WriteWithLengthPrefix(value);
}

private void Write(MqttPropertyId id, byte[] value)
void Write(MqttPropertyId id, byte[] value)
{
if (value == null)
{


+ 7
- 2
Source/MQTTnet/Implementations/MqttTcpChannel.cs 查看文件

@@ -10,6 +10,7 @@ using System.Runtime.ExceptionServices;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Exceptions;

namespace MQTTnet.Implementations
{
@@ -75,7 +76,7 @@ namespace MQTTnet.Implementations
cancellationToken.ThrowIfCancellationRequested();

var networkStream = socket.GetStream();
if (_options.TlsOptions?.UseTls == true)
{
var sslStream = new SslStream(networkStream, false, InternalUserCertificateValidationCallback);
@@ -144,7 +145,7 @@ namespace MQTTnet.Implementations

if (stream == null)
{
throw new ObjectDisposedException(nameof(stream));
return 0;
}

return await stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
@@ -185,6 +186,10 @@ namespace MQTTnet.Implementations
}

await stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);

// This subsequent call is required to check whether the socket is still connected.
// Without this call a broken connection is only recognized at the next call.
await stream.WriteAsync(PlatformAbstractionLayer.EmptyByteArray, 0, 0, cancellationToken);
}
}
catch (ObjectDisposedException)


+ 14
- 8
Source/MQTTnet/Implementations/MqttTcpServerAdapter.cs 查看文件

@@ -47,7 +47,7 @@ namespace MQTTnet.Implementations
{
throw new ArgumentException("TLS certificate is not set.");
}
var tlsCertificate = options.TlsEndpointOptions.CertificateProvider.GetCertificate();
if (!tlsCertificate.HasPrivateKey)
{
@@ -73,16 +73,22 @@ namespace MQTTnet.Implementations

void Cleanup()
{
_cancellationTokenSource?.Cancel(false);
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

foreach (var listener in _listeners)
try
{
listener.Dispose();
_cancellationTokenSource?.Cancel(false);
}
finally
{
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

_listeners.Clear();
foreach (var listener in _listeners)
{
listener.Dispose();
}

_listeners.Clear();
}
}

void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, CancellationToken cancellationToken)


+ 6
- 8
Source/MQTTnet/Implementations/MqttTcpServerListener.cs 查看文件

@@ -77,7 +77,7 @@ namespace MQTTnet.Implementations

_socket.Bind(_localEndPoint);
_socket.Listen(_options.ConnectionBacklog);
Task.Run(() => AcceptClientConnectionsAsync(cancellationToken), cancellationToken).Forget(_logger);

return true;
@@ -152,9 +152,7 @@ namespace MQTTnet.Implementations
_addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6");

clientSocket.NoDelay = _options.NoDelay;

stream = clientSocket.GetStream();

X509Certificate2 clientCertificate = null;

if (_tlsCertificate != null)
@@ -208,17 +206,17 @@ namespace MQTTnet.Implementations
{
stream?.Dispose();
clientSocket?.Dispose();

_logger.Verbose("Client '{0}' disconnected at TCP listener '{1}, {2}'.",
remoteEndPoint,
_localEndPoint,
_addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6");
}
catch (Exception disposeException)
{
_logger.Error(disposeException, "Error while cleaning up client connection");
}
}

_logger.Verbose("Client '{0}' disconnected at TCP listener '{1}, {2}'.",
remoteEndPoint,
_localEndPoint,
_addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6");
}
}
}

+ 14
- 9
Source/MQTTnet/Implementations/PlatformAbstractionLayer.cs 查看文件

@@ -1,20 +1,25 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;

namespace MQTTnet.Implementations
{
public static class PlatformAbstractionLayer
{
public static Task CompletedTask
public static readonly byte[] EmptyByteArray = new byte[0];

#if NET452
public static Task CompletedTask => Task.FromResult(0);
#else
public static Task CompletedTask => Task.CompletedTask;
#endif

public static void Sleep(TimeSpan timeout)
{
get
{
#if NET452
return Task.FromResult(0);
#if NET452 || NETSTANDARD2_0 || NETSTANDARD2_1 || NETCOREAPP3_1
System.Threading.Thread.Sleep(timeout);
#else
return Task.CompletedTask;
Task.Delay(timeout).Wait();
#endif
}
}

}
}

+ 1
- 8
Source/MQTTnet/Internal/Disposable.cs 查看文件

@@ -4,7 +4,7 @@ namespace MQTTnet.Internal
{
public abstract class Disposable : IDisposable
{
protected bool IsDisposed { get; private set; } = false;
protected bool IsDisposed { get; private set; }

protected void ThrowIfDisposed()
{
@@ -18,13 +18,6 @@ namespace MQTTnet.Internal
{
}

// TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
// ~Disposable()
// {
// // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
// Dispose(false);
// }

// This code added to correctly implement the disposable pattern.
public void Dispose()
{


+ 1
- 1
Source/MQTTnet/Internal/TestMqttChannel.cs 查看文件

@@ -8,7 +8,7 @@ namespace MQTTnet.Internal
{
public class TestMqttChannel : IMqttChannel
{
private readonly MemoryStream _stream;
readonly MemoryStream _stream;

public TestMqttChannel(MemoryStream stream)
{


+ 5
- 4
Source/MQTTnet/PacketDispatcher/MqttPacketAwaiter.cs 查看文件

@@ -27,10 +27,11 @@ namespace MQTTnet.PacketDispatcher
{
using (var timeoutToken = new CancellationTokenSource(timeout))
{
timeoutToken.Token.Register(() => Fail(new MqttCommunicationTimedOutException()));

var packet = await _taskCompletionSource.Task.ConfigureAwait(false);
return (TPacket)packet;
using (timeoutToken.Token.Register(() => Fail(new MqttCommunicationTimedOutException())))
{
var packet = await _taskCompletionSource.Task.ConfigureAwait(false);
return (TPacket)packet;
}
}
}



+ 2
- 2
Source/MQTTnet/PacketDispatcher/MqttPacketDispatcher.cs 查看文件

@@ -34,9 +34,9 @@ namespace MQTTnet.PacketDispatcher
}

ushort identifier = 0;
if (packet is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue)
if (packet is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier > 0)
{
identifier = packetWithIdentifier.PacketIdentifier.Value;
identifier = packetWithIdentifier.PacketIdentifier;
}

var type = packet.GetType();


+ 1
- 1
Source/MQTTnet/Packets/IMqttPacketWithIdentifier.cs 查看文件

@@ -2,6 +2,6 @@
{
public interface IMqttPacketWithIdentifier
{
ushort? PacketIdentifier { get; set; }
ushort PacketIdentifier { get; set; }
}
}

+ 1
- 1
Source/MQTTnet/Packets/MqttAuthPacket.cs 查看文件

@@ -5,7 +5,7 @@ namespace MQTTnet.Packets
/// <summary>
/// Added in MQTTv5.0.0.
/// </summary>
public class MqttAuthPacket : MqttBasePacket
public sealed class MqttAuthPacket : MqttBasePacket
{
public MqttAuthenticateReasonCode ReasonCode { get; set; }



+ 1
- 1
Source/MQTTnet/Packets/MqttAuthPacketProperties.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttAuthPacketProperties
public sealed class MqttAuthPacketProperties
{
public string AuthenticationMethod { get; set; }



+ 0
- 7
Source/MQTTnet/Packets/MqttBasePublishPacket.cs 查看文件

@@ -1,7 +0,0 @@
namespace MQTTnet.Packets
{
public class MqttBasePublishPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort? PacketIdentifier { get; set; }
}
}

+ 1
- 1
Source/MQTTnet/Packets/MqttConnAckPacket.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttConnAckPacket : MqttBasePacket
public sealed class MqttConnAckPacket : MqttBasePacket
{
public MqttConnectReturnCode? ReturnCode { get; set; }



+ 1
- 1
Source/MQTTnet/Packets/MqttConnAckPacketProperties.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttConnAckPacketProperties
public sealed class MqttConnAckPacketProperties
{
public uint? SessionExpiryInterval { get; set; }



+ 1
- 1
Source/MQTTnet/Packets/MqttConnectPacket.cs 查看文件

@@ -1,6 +1,6 @@
namespace MQTTnet.Packets
{
public class MqttConnectPacket : MqttBasePacket
public sealed class MqttConnectPacket : MqttBasePacket
{
public string ClientId { get; set; }



+ 1
- 1
Source/MQTTnet/Packets/MqttConnectPacketProperties.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttConnectPacketProperties
public sealed class MqttConnectPacketProperties
{
public uint? WillDelayInterval { get; set; }



+ 1
- 1
Source/MQTTnet/Packets/MqttDisconnectPacket.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttDisconnectPacket : MqttBasePacket
public sealed class MqttDisconnectPacket : MqttBasePacket
{
#region Added in MQTTv5



+ 1
- 1
Source/MQTTnet/Packets/MqttDisconnectPacketProperties.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttDisconnectPacketProperties
public sealed class MqttDisconnectPacketProperties
{
public uint? SessionExpiryInterval { get; set; }



+ 4
- 1
Source/MQTTnet/Packets/MqttPingReqPacket.cs 查看文件

@@ -1,7 +1,10 @@
namespace MQTTnet.Packets
{
public class MqttPingReqPacket : MqttBasePacket
public sealed class MqttPingReqPacket : MqttBasePacket
{
// This is a minor performance improvement.
public static MqttPingReqPacket Instance = new MqttPingReqPacket();

public override string ToString()
{
return "PingReq";


+ 4
- 1
Source/MQTTnet/Packets/MqttPingRespPacket.cs 查看文件

@@ -1,7 +1,10 @@
namespace MQTTnet.Packets
{
public class MqttPingRespPacket : MqttBasePacket
public sealed class MqttPingRespPacket : MqttBasePacket
{
// This is a minor performance improvement.
public static MqttPingRespPacket Instance = new MqttPingRespPacket();

public override string ToString()
{
return "PingResp";


+ 3
- 1
Source/MQTTnet/Packets/MqttPubAckPacket.cs 查看文件

@@ -2,8 +2,10 @@

namespace MQTTnet.Packets
{
public class MqttPubAckPacket : MqttBasePublishPacket
public sealed class MqttPubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }

#region Added in MQTTv5

public MqttPubAckReasonCode? ReasonCode { get; set; }


+ 1
- 1
Source/MQTTnet/Packets/MqttPubAckPacketProperties.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttPubAckPacketProperties
public sealed class MqttPubAckPacketProperties
{
public string ReasonString { get; set; }



+ 3
- 1
Source/MQTTnet/Packets/MqttPubCompPacket.cs 查看文件

@@ -2,8 +2,10 @@

namespace MQTTnet.Packets
{
public class MqttPubCompPacket : MqttBasePublishPacket
public sealed class MqttPubCompPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }

#region Added in MQTTv5

public MqttPubCompReasonCode? ReasonCode { get; set; }


+ 1
- 1
Source/MQTTnet/Packets/MqttPubCompPacketProperties.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttPubCompPacketProperties
public sealed class MqttPubCompPacketProperties
{
public string ReasonString { get; set; }



+ 3
- 1
Source/MQTTnet/Packets/MqttPubRecPacket.cs 查看文件

@@ -2,8 +2,10 @@

namespace MQTTnet.Packets
{
public class MqttPubRecPacket : MqttBasePublishPacket
public sealed class MqttPubRecPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }

#region Added in MQTTv5

public MqttPubRecReasonCode? ReasonCode { get; set; }


+ 1
- 1
Source/MQTTnet/Packets/MqttPubRecPacketProperties.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttPubRecPacketProperties
public sealed class MqttPubRecPacketProperties
{
public string ReasonString { get; set; }



+ 3
- 1
Source/MQTTnet/Packets/MqttPubRelPacket.cs 查看文件

@@ -2,8 +2,10 @@

namespace MQTTnet.Packets
{
public class MqttPubRelPacket : MqttBasePublishPacket
public sealed class MqttPubRelPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }

#region Added in MQTTv5

public MqttPubRelReasonCode? ReasonCode { get; set; }


+ 1
- 1
Source/MQTTnet/Packets/MqttPubRelPacketProperties.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttPubRelPacketProperties
public sealed class MqttPubRelPacketProperties
{
public string ReasonString { get; set; }



+ 3
- 1
Source/MQTTnet/Packets/MqttPublishPacket.cs 查看文件

@@ -2,8 +2,10 @@

namespace MQTTnet.Packets
{
public class MqttPublishPacket : MqttBasePublishPacket
public sealed class MqttPublishPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }

public bool Retain { get; set; }

public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; }


+ 1
- 1
Source/MQTTnet/Packets/MqttPublishPacketProperties.cs 查看文件

@@ -3,7 +3,7 @@ using MQTTnet.Protocol;

namespace MQTTnet.Packets
{
public class MqttPublishPacketProperties
public sealed class MqttPublishPacketProperties
{
public MqttPayloadFormatIndicator? PayloadFormatIndicator { get; set; }



+ 2
- 2
Source/MQTTnet/Packets/MqttSubAckPacket.cs 查看文件

@@ -4,9 +4,9 @@ using MQTTnet.Protocol;

namespace MQTTnet.Packets
{
public class MqttSubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier
public sealed class MqttSubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort? PacketIdentifier { get; set; }
public ushort PacketIdentifier { get; set; }

public List<MqttSubscribeReturnCode> ReturnCodes { get; set; } = new List<MqttSubscribeReturnCode>();



+ 1
- 1
Source/MQTTnet/Packets/MqttSubAckPacketProperties.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttSubAckPacketProperties
public sealed class MqttSubAckPacketProperties
{
public string ReasonString { get; set; }



+ 2
- 2
Source/MQTTnet/Packets/MqttSubscribePacket.cs 查看文件

@@ -3,9 +3,9 @@ using System.Linq;

namespace MQTTnet.Packets
{
public class MqttSubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier
public sealed class MqttSubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort? PacketIdentifier { get; set; }
public ushort PacketIdentifier { get; set; }

public List<MqttTopicFilter> TopicFilters { get; set; } = new List<MqttTopicFilter>();



+ 1
- 1
Source/MQTTnet/Packets/MqttSubscribePacketProperties.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttSubscribePacketProperties
public sealed class MqttSubscribePacketProperties
{
public uint? SubscriptionIdentifier { get; set; }



+ 2
- 2
Source/MQTTnet/Packets/MqttUnsubAckPacket.cs 查看文件

@@ -4,9 +4,9 @@ using MQTTnet.Protocol;

namespace MQTTnet.Packets
{
public class MqttUnsubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier
public sealed class MqttUnsubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort? PacketIdentifier { get; set; }
public ushort PacketIdentifier { get; set; }

#region Added in MQTTv5



+ 1
- 1
Source/MQTTnet/Packets/MqttUnsubAckPacketProperties.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttUnsubAckPacketProperties
public sealed class MqttUnsubAckPacketProperties
{
public string ReasonString { get; set; }



+ 2
- 2
Source/MQTTnet/Packets/MqttUnsubscribePacket.cs 查看文件

@@ -2,9 +2,9 @@

namespace MQTTnet.Packets
{
public class MqttUnsubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier
public sealed class MqttUnsubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort? PacketIdentifier { get; set; }
public ushort PacketIdentifier { get; set; }

public List<string> TopicFilters { get; set; } = new List<string>();



+ 1
- 1
Source/MQTTnet/Packets/MqttUnsubscribePacketProperties.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttUnsubscribePacketProperties
public sealed class MqttUnsubscribePacketProperties
{
public List<MqttUserProperty> UserProperties { get; set; }
}


+ 1
- 1
Source/MQTTnet/Packets/MqttUserProperty.cs 查看文件

@@ -2,7 +2,7 @@

namespace MQTTnet.Packets
{
public class MqttUserProperty
public sealed class MqttUserProperty
{
public MqttUserProperty(string name, string value)
{


+ 1
- 0
Source/MQTTnet/Server/IMqttServerOptions.cs 查看文件

@@ -12,6 +12,7 @@ namespace MQTTnet.Server
MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; }

TimeSpan DefaultCommunicationTimeout { get; }
TimeSpan KeepAliveMonitorInterval { get; }

IMqttServerConnectionValidator ConnectionValidator { get; }
IMqttServerSubscriptionInterceptor SubscriptionInterceptor { get; }


+ 16
- 36
Source/MQTTnet/Server/MqttClientConnection.cs 查看文件

@@ -23,7 +23,6 @@ namespace MQTTnet.Server
readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();

readonly IMqttRetainedMessagesManager _retainedMessagesManager;
readonly MqttClientKeepAliveMonitor _keepAliveMonitor;
readonly MqttClientSessionsManager _sessionsManager;

readonly IMqttNetScopedLogger _logger;
@@ -36,7 +35,6 @@ namespace MQTTnet.Server
readonly DateTime _connectedTimestamp;

volatile Task _packageReceiverTask;
DateTime _lastPacketReceivedTimestamp;
DateTime _lastNonKeepAlivePacketReceivedTimestamp;

long _receivedPacketsCount;
@@ -69,17 +67,19 @@ namespace MQTTnet.Server
if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateScopedLogger(nameof(MqttClientConnection));

_keepAliveMonitor = new MqttClientKeepAliveMonitor(ConnectPacket.ClientId, () => StopAsync(), logger);

_connectedTimestamp = DateTime.UtcNow;
_lastPacketReceivedTimestamp = _connectedTimestamp;
_lastNonKeepAlivePacketReceivedTimestamp = _lastPacketReceivedTimestamp;
LastPacketReceivedTimestamp = _connectedTimestamp;
_lastNonKeepAlivePacketReceivedTimestamp = LastPacketReceivedTimestamp;
}

public MqttConnectPacket ConnectPacket { get; }

public string ClientId => ConnectPacket.ClientId;

public bool IsReadingPacket => _channelAdapter.IsReadingPacket;

public DateTime LastPacketReceivedTimestamp { get; private set; }

public MqttClientSession Session { get; }

public Task StopAsync(bool isTakeover = false)
@@ -115,7 +115,7 @@ namespace MQTTnet.Server
status.SentPacketsCount = Interlocked.Read(ref _sentPacketsCount);

status.ConnectedTimestamp = _connectedTimestamp;
status.LastPacketReceivedTimestamp = _lastPacketReceivedTimestamp;
status.LastPacketReceivedTimestamp = LastPacketReceivedTimestamp;
status.LastNonKeepAlivePacketReceivedTimestamp = _lastNonKeepAlivePacketReceivedTimestamp;

status.BytesSent = _channelAdapter.BytesSent;
@@ -140,16 +140,10 @@ namespace MQTTnet.Server
{
_logger.Info("Client '{0}': Session started.", ClientId);

_channelAdapter.ReadingPacketStartedCallback = OnAdapterReadingPacketStarted;
_channelAdapter.ReadingPacketCompletedCallback = OnAdapterReadingPacketCompleted;

Session.WillMessage = ConnectPacket.WillMessage;

Task.Run(() => SendPendingPacketsAsync(_cancellationToken.Token), _cancellationToken.Token).Forget(_logger);

// TODO: Change to single thread in SessionManager. Or use SessionManager and stats from KeepAliveMonitor.
_keepAliveMonitor.Start(ConnectPacket.KeepAlivePeriod, _cancellationToken.Token);

await SendAsync(_channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(_connectionValidatorContext)).ConfigureAwait(false);

Session.IsCleanSession = false;
@@ -164,15 +158,13 @@ namespace MQTTnet.Server
}

Interlocked.Increment(ref _sentPacketsCount);
_lastPacketReceivedTimestamp = DateTime.UtcNow;
LastPacketReceivedTimestamp = DateTime.UtcNow;

if (!(packet is MqttPingReqPacket || packet is MqttPingRespPacket))
{
_lastNonKeepAlivePacketReceivedTimestamp = _lastPacketReceivedTimestamp;
_lastNonKeepAlivePacketReceivedTimestamp = LastPacketReceivedTimestamp;
}

_keepAliveMonitor.PacketReceived();

if (packet is MqttPublishPacket publishPacket)
{
await HandleIncomingPublishPacketAsync(publishPacket).ConfigureAwait(false);
@@ -252,9 +244,6 @@ namespace MQTTnet.Server

_packetDispatcher.Reset();

_channelAdapter.ReadingPacketStartedCallback = null;
_channelAdapter.ReadingPacketCompletedCallback = null;

_packageReceiverTask = null;

if (_isTakeover)
@@ -302,10 +291,10 @@ namespace MQTTnet.Server

async Task HandleIncomingSubscribePacketAsync(MqttSubscribePacket subscribePacket)
{
// TODO: Let the channel adapter create the packet.
var subscribeResult = await Session.SubscriptionsManager.SubscribeAsync(subscribePacket, ConnectPacket).ConfigureAwait(false);
var subAckPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreateSubAckPacket(subscribePacket, subscribeResult);

await SendAsync(subscribeResult.ResponsePacket).ConfigureAwait(false);
await SendAsync(subAckPacket).ConfigureAwait(false);

if (subscribeResult.CloseConnection)
{
@@ -318,9 +307,10 @@ namespace MQTTnet.Server

async Task HandleIncomingUnsubscribePacketAsync(MqttUnsubscribePacket unsubscribePacket)
{
// TODO: Let the channel adapter create the packet.
var unsubscribeResult = await Session.SubscriptionsManager.UnsubscribeAsync(unsubscribePacket).ConfigureAwait(false);
await SendAsync(unsubscribeResult).ConfigureAwait(false);
var reasonCodes = await Session.SubscriptionsManager.UnsubscribeAsync(unsubscribePacket).ConfigureAwait(false);
var unsubAckPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreateUnsubAckPacket(unsubscribePacket, reasonCodes);

await SendAsync(unsubAckPacket).ConfigureAwait(false);
}

Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
@@ -389,7 +379,7 @@ namespace MQTTnet.Server
{
while (!cancellationToken.IsCancellationRequested)
{
queuedApplicationMessage = await Session.ApplicationMessagesQueue.TakeAsync(cancellationToken).ConfigureAwait(false);
queuedApplicationMessage = await Session.ApplicationMessagesQueue.DequeueAsync(cancellationToken).ConfigureAwait(false);
if (queuedApplicationMessage == null)
{
return;
@@ -503,15 +493,5 @@ namespace MQTTnet.Server
Interlocked.Increment(ref _receivedApplicationMessagesCount);
}
}

void OnAdapterReadingPacketCompleted()
{
_keepAliveMonitor?.Resume();
}

void OnAdapterReadingPacketStarted()
{
_keepAliveMonitor?.Pause();
}
}
}

+ 0
- 93
Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs 查看文件

@@ -1,93 +0,0 @@
using MQTTnet.Diagnostics;
using MQTTnet.Internal;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public sealed class MqttClientKeepAliveMonitor
{
readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch();

readonly string _clientId;
readonly Func<Task> _keepAliveElapsedCallback;
readonly IMqttNetScopedLogger _logger;

bool _isPaused;

public MqttClientKeepAliveMonitor(string clientId, Func<Task> keepAliveElapsedCallback, IMqttNetLogger logger)
{
_clientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
_keepAliveElapsedCallback = keepAliveElapsedCallback ?? throw new ArgumentNullException(nameof(keepAliveElapsedCallback));

if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateScopedLogger(nameof(MqttClientKeepAliveMonitor));
}

public void Start(int keepAlivePeriod, CancellationToken cancellationToken)
{
if (keepAlivePeriod == 0)
{
return;
}

Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken).Forget(_logger);
}

public void Pause()
{
_isPaused = true;
}

public void Resume()
{
_isPaused = false;
}

public void PacketReceived()
{
_lastPacketReceivedTracker.Restart();
}

async Task RunAsync(int keepAlivePeriod, CancellationToken cancellationToken)
{
try
{
_lastPacketReceivedTracker.Restart();

while (!cancellationToken.IsCancellationRequested)
{
// Values described here: [MQTT-3.1.2-24].
// If the client sends 5 sec. the server will allow up to 7.5 seconds.
// If the client sends 1 sec. the server will allow up to 1.5 seconds.
if (!_isPaused && _lastPacketReceivedTracker.Elapsed.TotalSeconds >= keepAlivePeriod * 1.5D)
{
_logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientId);
await _keepAliveElapsedCallback().ConfigureAwait(false);

return;
}

// The server checks the keep alive timeout every 50 % of the overall keep alive timeout
// because the server allows 1.5 times the keep alive value. This means that a value of 5 allows
// up to 7.5 seconds. With an interval of 2.5 (5 / 2) the 7.5 is also affected. Waiting the whole
// keep alive time will hit at 10 instead of 7.5 (but only one time instead of two times).
await Task.Delay(TimeSpan.FromSeconds(keepAlivePeriod * 0.5D), cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.Error(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientId);
}
finally
{
_logger.Verbose("Client '{0}': Stopped checking keep alive timeout.", _clientId);
}
}
}
}

+ 1
- 1
Source/MQTTnet/Server/MqttClientSession.cs 查看文件

@@ -6,7 +6,7 @@ using System.Threading.Tasks;

namespace MQTTnet.Server
{
public class MqttClientSession
public sealed class MqttClientSession
{
readonly IMqttNetScopedLogger _logger;



+ 19
- 24
Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs 查看文件

@@ -6,11 +6,11 @@ using System.Threading.Tasks;

namespace MQTTnet.Server
{
public class MqttClientSessionApplicationMessagesQueue : Disposable
public sealed class MqttClientSessionApplicationMessagesQueue : IDisposable
{
private readonly AsyncQueue<MqttQueuedApplicationMessage> _messageQueue = new AsyncQueue<MqttQueuedApplicationMessage>();
private readonly IMqttServerOptions _options;
readonly AsyncQueue<MqttQueuedApplicationMessage> _messageQueue = new AsyncQueue<MqttQueuedApplicationMessage>();
readonly IMqttServerOptions _options;

public MqttClientSessionApplicationMessagesQueue(IMqttServerOptions options)
{
@@ -32,22 +32,6 @@ namespace MQTTnet.Server
});
}

public void Clear()
{
_messageQueue.Clear();
}

public async Task<MqttQueuedApplicationMessage> TakeAsync(CancellationToken cancellationToken)
{
var dequeueResult = await _messageQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false);
if (!dequeueResult.IsSuccess)
{
return null;
}

return dequeueResult.Item;
}

public void Enqueue(MqttQueuedApplicationMessage queuedApplicationMessage)
{
if (queuedApplicationMessage == null) throw new ArgumentNullException(nameof(queuedApplicationMessage));
@@ -71,14 +55,25 @@ namespace MQTTnet.Server
}
}

protected override void Dispose(bool disposing)
public async Task<MqttQueuedApplicationMessage> DequeueAsync(CancellationToken cancellationToken)
{
if (disposing)
var dequeueResult = await _messageQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false);
if (!dequeueResult.IsSuccess)
{
_messageQueue.Dispose();
return null;
}

base.Dispose(disposing);
return dequeueResult.Item;
}

public void Clear()
{
_messageQueue.Clear();
}

public void Dispose()
{
_messageQueue?.Dispose();
}
}
}

+ 6
- 0
Source/MQTTnet/Server/MqttClientSessionsManager.cs 查看文件

@@ -9,6 +9,7 @@ using MQTTnet.Server.Status;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

@@ -63,6 +64,11 @@ namespace MQTTnet.Server
}
}

public List<MqttClientConnection> GetConnections()
{
return _connections.Values.ToList();
}

public Task HandleClientConnectionAsync(IMqttChannelAdapter clientAdapter)
{
if (clientAdapter is null) throw new ArgumentNullException(nameof(clientAdapter));


+ 6
- 3
Source/MQTTnet/Server/MqttClientSubscribeResult.cs 查看文件

@@ -1,10 +1,13 @@
using MQTTnet.Packets;
using System.Collections.Generic;
using MQTTnet.Protocol;

namespace MQTTnet.Server
{
public class MqttClientSubscribeResult
public sealed class MqttClientSubscribeResult
{
public MqttSubAckPacket ResponsePacket { get; set; }
public List<MqttSubscribeReturnCode> ReturnCodes { get; } = new List<MqttSubscribeReturnCode>();

public List<MqttSubscribeReasonCode> ReasonCodes { get; } = new List<MqttSubscribeReasonCode>();

public bool CloseConnection { get; set; }
}


+ 21
- 32
Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs 查看文件

@@ -7,12 +7,12 @@ using System.Threading.Tasks;

namespace MQTTnet.Server
{
public class MqttClientSubscriptionsManager
public sealed class MqttClientSubscriptionsManager
{
private readonly Dictionary<string, MqttTopicFilter> _subscriptions = new Dictionary<string, MqttTopicFilter>();
private readonly MqttClientSession _clientSession;
private readonly IMqttServerOptions _serverOptions;
private readonly MqttServerEventDispatcher _eventDispatcher;
readonly Dictionary<string, MqttTopicFilter> _subscriptions = new Dictionary<string, MqttTopicFilter>();
readonly MqttClientSession _clientSession;
readonly IMqttServerOptions _serverOptions;
readonly MqttServerEventDispatcher _eventDispatcher;

public MqttClientSubscriptionsManager(MqttClientSession clientSession, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions)
{
@@ -28,15 +28,7 @@ namespace MQTTnet.Server
if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket));
if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket));

var result = new MqttClientSubscribeResult
{
ResponsePacket = new MqttSubAckPacket
{
PacketIdentifier = subscribePacket.PacketIdentifier
},

CloseConnection = false
};
var result = new MqttClientSubscribeResult();

foreach (var originalTopicFilter in subscribePacket.TopicFilters)
{
@@ -46,13 +38,13 @@ namespace MQTTnet.Server

if (finalTopicFilter == null || string.IsNullOrEmpty(finalTopicFilter.Topic) || !interceptorContext.AcceptSubscription)
{
result.ResponsePacket.ReturnCodes.Add(MqttSubscribeReturnCode.Failure);
result.ResponsePacket.ReasonCodes.Add(MqttSubscribeReasonCode.UnspecifiedError);
result.ReturnCodes.Add(MqttSubscribeReturnCode.Failure);
result.ReasonCodes.Add(MqttSubscribeReasonCode.UnspecifiedError);
}
else
{
result.ResponsePacket.ReturnCodes.Add(ConvertToSubscribeReturnCode(finalTopicFilter.QualityOfServiceLevel));
result.ResponsePacket.ReasonCodes.Add(ConvertToSubscribeReasonCode(finalTopicFilter.QualityOfServiceLevel));
result.ReturnCodes.Add(ConvertToSubscribeReturnCode(finalTopicFilter.QualityOfServiceLevel));
result.ReasonCodes.Add(ConvertToSubscribeReasonCode(finalTopicFilter.QualityOfServiceLevel));
}

if (interceptorContext.CloseConnection)
@@ -98,21 +90,18 @@ namespace MQTTnet.Server
}
}

public async Task<MqttUnsubAckPacket> UnsubscribeAsync(MqttUnsubscribePacket unsubscribePacket)
public async Task<List<MqttUnsubscribeReasonCode>> UnsubscribeAsync(MqttUnsubscribePacket unsubscribePacket)
{
if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket));

var unsubAckPacket = new MqttUnsubAckPacket
{
PacketIdentifier = unsubscribePacket.PacketIdentifier
};
var reasonCodes = new List<MqttUnsubscribeReasonCode>();

foreach (var topicFilter in unsubscribePacket.TopicFilters)
{
var interceptorContext = await InterceptUnsubscribeAsync(topicFilter).ConfigureAwait(false);
if (!interceptorContext.AcceptUnsubscription)
{
unsubAckPacket.ReasonCodes.Add(MqttUnsubscribeReasonCode.ImplementationSpecificError);
reasonCodes.Add(MqttUnsubscribeReasonCode.ImplementationSpecificError);
continue;
}

@@ -120,11 +109,11 @@ namespace MQTTnet.Server
{
if (_subscriptions.Remove(topicFilter))
{
unsubAckPacket.ReasonCodes.Add(MqttUnsubscribeReasonCode.Success);
reasonCodes.Add(MqttUnsubscribeReasonCode.Success);
}
else
{
unsubAckPacket.ReasonCodes.Add(MqttUnsubscribeReasonCode.NoSubscriptionExisted);
reasonCodes.Add(MqttUnsubscribeReasonCode.NoSubscriptionExisted);
}
}
}
@@ -134,7 +123,7 @@ namespace MQTTnet.Server
await _eventDispatcher.SafeNotifyClientUnsubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false);
}

return unsubAckPacket;
return reasonCodes;
}

public async Task UnsubscribeAsync(IEnumerable<string> topicFilters)
@@ -184,7 +173,7 @@ namespace MQTTnet.Server
return CreateSubscriptionResult(qosLevel, qosLevels);
}

private static MqttSubscribeReturnCode ConvertToSubscribeReturnCode(MqttQualityOfServiceLevel qualityOfServiceLevel)
static MqttSubscribeReturnCode ConvertToSubscribeReturnCode(MqttQualityOfServiceLevel qualityOfServiceLevel)
{
switch (qualityOfServiceLevel)
{
@@ -195,7 +184,7 @@ namespace MQTTnet.Server
}
}

private static MqttSubscribeReasonCode ConvertToSubscribeReasonCode(MqttQualityOfServiceLevel qualityOfServiceLevel)
static MqttSubscribeReasonCode ConvertToSubscribeReasonCode(MqttQualityOfServiceLevel qualityOfServiceLevel)
{
switch (qualityOfServiceLevel)
{
@@ -206,7 +195,7 @@ namespace MQTTnet.Server
}
}

private async Task<MqttSubscriptionInterceptorContext> InterceptSubscribeAsync(MqttTopicFilter topicFilter)
async Task<MqttSubscriptionInterceptorContext> InterceptSubscribeAsync(MqttTopicFilter topicFilter)
{
var context = new MqttSubscriptionInterceptorContext(_clientSession.ClientId, topicFilter, _clientSession.Items);
if (_serverOptions.SubscriptionInterceptor != null)
@@ -217,7 +206,7 @@ namespace MQTTnet.Server
return context;
}

private async Task<MqttUnsubscriptionInterceptorContext> InterceptUnsubscribeAsync(string topicFilter)
async Task<MqttUnsubscriptionInterceptorContext> InterceptUnsubscribeAsync(string topicFilter)
{
var context = new MqttUnsubscriptionInterceptorContext(_clientSession.ClientId, topicFilter, _clientSession.Items);
if (_serverOptions.UnsubscriptionInterceptor != null)
@@ -228,7 +217,7 @@ namespace MQTTnet.Server
return context;
}

private static CheckSubscriptionsResult CreateSubscriptionResult(MqttQualityOfServiceLevel qosLevel, HashSet<MqttQualityOfServiceLevel> subscribedQoSLevels)
static CheckSubscriptionsResult CreateSubscriptionResult(MqttQualityOfServiceLevel qosLevel, HashSet<MqttQualityOfServiceLevel> subscribedQoSLevels)
{
MqttQualityOfServiceLevel effectiveQoS;
if (subscribedQoSLevels.Contains(qosLevel))


+ 115
- 0
Source/MQTTnet/Server/MqttServerKeepAliveMonitor.cs 查看文件

@@ -0,0 +1,115 @@
using MQTTnet.Diagnostics;
using MQTTnet.Internal;
using System;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Implementations;

namespace MQTTnet.Server
{
public sealed class MqttServerKeepAliveMonitor
{
readonly IMqttServerOptions _options;
readonly MqttClientSessionsManager _sessionsManager;
readonly IMqttNetScopedLogger _logger;

public MqttServerKeepAliveMonitor(IMqttServerOptions options, MqttClientSessionsManager sessionsManager, IMqttNetLogger logger)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateScopedLogger(nameof(MqttServerKeepAliveMonitor));
}

public void Start(CancellationToken cancellationToken)
{
// The keep alive monitor spawns a real new thread (LongRunning) because it does not
// support async/await. Async etc. is avoided here because the thread will usually check
// the connections every few milliseconds and thus the context changes (due to async) are
// only consuming resources. Also there is just 1 thread for the entire server which is fine at all!
Task.Factory.StartNew(_ => DoWork(cancellationToken), cancellationToken, TaskCreationOptions.LongRunning).Forget(_logger);
}

void DoWork(CancellationToken cancellationToken)
{
try
{
_logger.Info("Starting keep alive monitor.");

while (!cancellationToken.IsCancellationRequested)
{
TryMaintainConnections();
PlatformAbstractionLayer.Sleep(_options.KeepAliveMonitorInterval);
}
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.Error(exception, "Unhandled exception while checking keep alive timeouts.");
}
finally
{
_logger.Verbose("Stopped checking keep alive timeout.");
}
}

void TryMaintainConnections()
{
var now = DateTime.UtcNow;
foreach (var connection in _sessionsManager.GetConnections())
{
TryMaintainConnection(connection, now);
}
}

void TryMaintainConnection(MqttClientConnection connection, DateTime now)
{
try
{
//if (connection.IsStopped)
//{
// // The connection is already dead so there is no need to check it.
// return;
//}

if (connection.ConnectPacket.KeepAlivePeriod == 0)
{
// The keep alive feature is not used by the current connection.
return;
}

if (connection.IsReadingPacket)
{
// The connection is currently reading a (large) packet. So it is obviously
// doing something and thus "connected".
return;
}

// Values described here: [MQTT-3.1.2-24].
// If the client sends 5 sec. the server will allow up to 7.5 seconds.
// If the client sends 1 sec. the server will allow up to 1.5 seconds.
var maxDurationWithoutPacket = connection.ConnectPacket.KeepAlivePeriod * 1.5D;

var secondsWithoutPackage = (now - connection.LastPacketReceivedTimestamp).TotalSeconds;
if (secondsWithoutPackage < maxDurationWithoutPacket)
{
// A packet was received before the timeout is affected.
return;
}

_logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", connection.ClientId);

// Execute the disconnection in background so that the keep alive monitor can continue
// with checking other connections.
Task.Run(() => connection.StopAsync());
}
catch (Exception exception)
{
_logger.Error(exception, "Client {0}: Unhandled exception while checking keep alive timeouts.", connection.ClientId);
}
}
}
}

+ 2
- 0
Source/MQTTnet/Server/MqttServerOptions.cs 查看文件

@@ -18,6 +18,8 @@ namespace MQTTnet.Server

public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(15);

public TimeSpan KeepAliveMonitorInterval { get; set; } = TimeSpan.FromMilliseconds(500);

public IMqttServerConnectionValidator ConnectionValidator { get; set; }

public IMqttServerApplicationMessageInterceptor ApplicationMessageInterceptor { get; set; }


+ 61
- 2
Tests/MQTTnet.Core.Tests/AsyncLock_Tests.cs 查看文件

@@ -1,4 +1,5 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Internal;
@@ -9,7 +10,65 @@ namespace MQTTnet.Tests
public class AsyncLock_Tests
{
[TestMethod]
public void AsyncLock()
public async Task Lock_Serial_Calls()
{
var sum = 0;

var @lock = new AsyncLock();
for (var i = 0; i < 100; i++)
{
using (await @lock.WaitAsync().ConfigureAwait(false))
{
sum++;
}
}

Assert.AreEqual(100, sum);
}

[TestMethod]
[ExpectedException(typeof(TaskCanceledException))]
public async Task Test_Cancellation()
{
var @lock = new AsyncLock();

// This call will never "release" the lock due to missing _using_.
await @lock.WaitAsync().ConfigureAwait(false);

using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)))
{
await @lock.WaitAsync(cts.Token).ConfigureAwait(false);
}
}

//[TestMethod]
//public async Task Test_Cancellation_With_Later_Access()
//{
// var @lock = new AsyncLock();

// var releaser = await @lock.WaitAsync().ConfigureAwait(false);

// try
// {
// using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)))
// {
// await @lock.WaitAsync(cts.Token).ConfigureAwait(false);
// }
// }
// catch (OperationCanceledException)
// {
// }
// releaser.Dispose();

// using (await @lock.WaitAsync().ConfigureAwait(false))
// {
// // When the method finished, the thread got access.
// }
//}

[TestMethod]
public void Lock_10_Parallel_Tasks()
{
const int ThreadsCount = 10;



+ 47
- 3
Tests/MQTTnet.Core.Tests/AsyncQueue_Tests.cs 查看文件

@@ -1,4 +1,5 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Internal;
@@ -36,9 +37,22 @@ namespace MQTTnet.Tests
Assert.AreEqual(3, queue.Count);
}

[TestMethod]
public async Task Cancellation()
{
var queue = new AsyncQueue<int>();

bool success;
using (var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3)))
{
success = (await queue.TryDequeueAsync(cancellationTokenSource.Token)).IsSuccess;
}

Assert.AreEqual(false, success);
}

[TestMethod]
public async Task Preserve_ProcessAsync()
public async Task Process_Async()
{
var queue = new AsyncQueue<int>();

@@ -50,7 +64,37 @@ namespace MQTTnet.Tests
sum += (await queue.TryDequeueAsync(CancellationToken.None)).Item;
}
});

queue.Enqueue(1);
await Task.Delay(500);

queue.Enqueue(2);
await Task.Delay(500);

queue.Enqueue(3);
await Task.Delay(500);

Assert.AreEqual(6, sum);
Assert.AreEqual(TaskStatus.RanToCompletion, worker.Status);
}

[TestMethod]
public async Task Process_Async_With_Initial_Delay()
{
var queue = new AsyncQueue<int>();

var sum = 0;
var worker = Task.Run(async () =>
{
while (sum < 6)
{
sum += (await queue.TryDequeueAsync(CancellationToken.None)).Item;
}
});

// This line is the diff to test _Process_Async_
await Task.Delay(500);

queue.Enqueue(1);
await Task.Delay(500);



+ 39
- 4
Tests/MQTTnet.Core.Tests/LowLevelMqttClient_Tests.cs 查看文件

@@ -1,10 +1,12 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client.Options;
using MQTTnet.LowLevelClient;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Tests.Mockups;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -34,7 +36,7 @@ namespace MQTTnet.Tests
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();
await testEnvironment.StartServerAsync();

var factory = new MqttFactory();
var lowLevelClient = factory.CreateLowLevelMqttClient();
@@ -50,7 +52,7 @@ namespace MQTTnet.Tests
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();
await testEnvironment.StartServerAsync();

var factory = new MqttFactory();
var lowLevelClient = factory.CreateLowLevelMqttClient();
@@ -71,7 +73,7 @@ namespace MQTTnet.Tests
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();
await testEnvironment.StartServerAsync();

var factory = new MqttFactory();
var lowLevelClient = factory.CreateLowLevelMqttClient();
@@ -89,6 +91,39 @@ namespace MQTTnet.Tests
}
}

[TestMethod]
public async Task Loose_Connection()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
testEnvironment.ServerPort = 8364;
var server = await testEnvironment.StartServerAsync();
var client = await testEnvironment.ConnectLowLevelClientAsync(o => o.WithCommunicationTimeout(TimeSpan.Zero));

await Authenticate(client).ConfigureAwait(false);

await server.StopAsync();

await Task.Delay(1000);

try
{
await client.SendAsync(MqttPingReqPacket.Instance, CancellationToken.None).ConfigureAwait(false);
}
catch (MqttCommunicationException exception)
{
Assert.IsTrue(exception.InnerException is SocketException);
return;
}
catch
{
Assert.Fail("Wrong exception type thrown.");
}

Assert.Fail("This MUST fail");
}
}

async Task<MqttConnAckPacket> Authenticate(ILowLevelMqttClient client)
{
await client.SendAsync(new MqttConnectPacket


+ 22
- 1
Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs 查看文件

@@ -6,7 +6,9 @@ using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.LowLevelClient;

namespace MQTTnet.Tests.Mockups
{
@@ -100,6 +102,25 @@ namespace MQTTnet.Tests.Mockups
return ConnectClientAsync(new MqttClientOptionsBuilder());
}

public Task<ILowLevelMqttClient> ConnectLowLevelClientAsync()
{
return ConnectLowLevelClientAsync(o => {});
}

public async Task<ILowLevelMqttClient> ConnectLowLevelClientAsync(Action<MqttClientOptionsBuilder> optionsBuilder)
{
if (optionsBuilder == null) throw new ArgumentNullException(nameof(optionsBuilder));

var options = new MqttClientOptionsBuilder();
options = options.WithTcpServer("127.0.0.1", ServerPort);
optionsBuilder.Invoke(options);

var client = new MqttFactory().CreateLowLevelMqttClient();
await client.ConnectAsync(options.Build(), CancellationToken.None).ConfigureAwait(false);

return client;
}

public async Task<IMqttClient> ConnectClientAsync(MqttClientOptionsBuilder options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
@@ -111,7 +132,7 @@ namespace MQTTnet.Tests.Mockups

return client;
}
public async Task<IMqttClient> ConnectClientAsync(IMqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));


+ 2
- 3
Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs 查看文件

@@ -11,7 +11,7 @@ namespace MQTTnet.Tests.Mockups
{
public class TestMqttCommunicationAdapter : IMqttChannelAdapter
{
private readonly BlockingCollection<MqttBasePacket> _incomingPackets = new BlockingCollection<MqttBasePacket>();
readonly BlockingCollection<MqttBasePacket> _incomingPackets = new BlockingCollection<MqttBasePacket>();

public TestMqttCommunicationAdapter Partner { get; set; }

@@ -26,8 +26,7 @@ namespace MQTTnet.Tests.Mockups
public long BytesSent { get; }
public long BytesReceived { get; }

public Action ReadingPacketStartedCallback { get; set; }
public Action ReadingPacketCompletedCallback { get; set; }
public bool IsReadingPacket { get; }

public void Dispose()
{


+ 0
- 118
Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs 查看文件

@@ -1,118 +0,0 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Server;
using MQTTnet.Server.Status;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Tests
{
[TestClass]
public class MqttKeepAliveMonitor_Tests
{
[TestMethod]
public async Task KeepAlive_Timeout()
{
var counter = 0;

var monitor = new MqttClientKeepAliveMonitor("", () =>
{
counter++;
return Task.CompletedTask;
},
new MqttNetLogger());

Assert.AreEqual(0, counter);

monitor.Start(1, CancellationToken.None);

Assert.AreEqual(0, counter);

await Task.Delay(2000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification.

Assert.AreEqual(1, counter);
}

[TestMethod]
public async Task KeepAlive_NoTimeout()
{
var counter = 0;

var monitor = new MqttClientKeepAliveMonitor("", () =>
{
counter++;
return Task.CompletedTask;
},
new MqttNetLogger());

Assert.AreEqual(0, counter);

monitor.Start(1, CancellationToken.None);

Assert.AreEqual(0, counter);

// Simulate traffic.
await Task.Delay(1000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification.
monitor.PacketReceived();
await Task.Delay(1000);
monitor.PacketReceived();
await Task.Delay(1000);

Assert.AreEqual(0, counter);

await Task.Delay(2000);

Assert.AreEqual(1, counter);
}

private class TestClientSession : IMqttClientSession
{
public string ClientId { get; }

public int StopCalledCount { get; private set; }

public void FillStatus(MqttClientStatus status)
{
throw new NotSupportedException();
}

public void EnqueueApplicationMessage(MqttClientConnection senderClientSession, MqttApplicationMessage applicationMessage)
{
throw new NotSupportedException();
}

public void ClearPendingApplicationMessages()
{
throw new NotSupportedException();
}

public Task RunAsync(MqttApplicationMessage willMessage, int keepAlivePeriod, IMqttChannelAdapter adapter)
{
throw new NotSupportedException();
}

public Task StopAsync()
{
StopCalledCount++;
return Task.FromResult(0);
}

public Task SubscribeAsync(IList<MqttTopicFilter> topicFilters)
{
throw new NotSupportedException();
}

public Task UnsubscribeAsync(IList<string> topicFilters)
{
throw new NotSupportedException();
}

public void Dispose()
{
}
}
}
}

+ 78
- 0
Tests/MQTTnet.Core.Tests/Server_Connection_Tests.cs 查看文件

@@ -0,0 +1,78 @@
using System;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Implementations;
using MQTTnet.Server;
using MQTTnet.Tests.Mockups;

namespace MQTTnet.Tests
{
[TestClass]
public sealed class Server_Connection_Tests
{
[TestMethod]
public async Task Close_Idle_Connection_On_Connect()
{
using (var testEnvironment = new TestEnvironment())
{
await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));

var client = new CrossPlatformSocket(AddressFamily.InterNetwork);
await client.ConnectAsync("localhost", testEnvironment.ServerPort, CancellationToken.None);

// Don't send anything. The server should close the connection.
await Task.Delay(TimeSpan.FromSeconds(3));

try
{
var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
if (receivedBytes == 0)
{
return;
}

Assert.Fail("Receive should throw an exception.");
}
catch (SocketException)
{
}
}
}

[TestMethod]
public async Task Send_Garbage()
{
using (var testEnvironment = new TestEnvironment())
{
await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));

// Send an invalid packet and ensure that the server will close the connection and stay in a waiting state
// forever. This is security related.
var client = new CrossPlatformSocket(AddressFamily.InterNetwork);
await client.ConnectAsync("localhost", testEnvironment.ServerPort, CancellationToken.None);

var buffer = Encoding.UTF8.GetBytes("Garbage");
await client.SendAsync(new ArraySegment<byte>(buffer), SocketFlags.None);

await Task.Delay(TimeSpan.FromSeconds(3));

try
{
var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
if (receivedBytes == 0)
{
return;
}

Assert.Fail("Receive should throw an exception.");
}
catch (SocketException)
{
}
}
}
}
}

+ 58
- 0
Tests/MQTTnet.Core.Tests/Server_KeepAlive_Tests.cs 查看文件

@@ -0,0 +1,58 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Packets;
using MQTTnet.Tests.Mockups;

namespace MQTTnet.Tests
{
[TestClass]
public sealed class Server_KeepAlive_Tests
{
[TestMethod]
public async Task Disconnect_Client_DueTo_KeepAlive()
{
using (var testEnvironment = new TestEnvironment())
{
var server = await testEnvironment.StartServerAsync();

var client = await testEnvironment.ConnectLowLevelClientAsync(o => o.WithCommunicationTimeout(TimeSpan.FromSeconds(2))).ConfigureAwait(false);

await client.SendAsync(new MqttConnectPacket
{
CleanSession = true,
ClientId = "abc",
KeepAlivePeriod = 1,
}, CancellationToken.None).ConfigureAwait(false);

var response = await client.ReceiveAsync(CancellationToken.None).ConfigureAwait(false);

Assert.IsTrue(response is MqttConnAckPacket);

await client.SendAsync(MqttPingReqPacket.Instance, CancellationToken.None);
await Task.Delay(500);
await client.SendAsync(MqttPingReqPacket.Instance, CancellationToken.None);
await Task.Delay(500);
await client.SendAsync(MqttPingReqPacket.Instance, CancellationToken.None);
await Task.Delay(500);
await client.SendAsync(MqttPingReqPacket.Instance, CancellationToken.None);

// If we reach this point everything works as expected (server did not close the connection
// due to proper ping messages.
// Now we will wait 1.2 seconds because the server MUST wait 1.5 seconds in total (See spec).

await Task.Delay(1200);
await client.SendAsync(MqttPingReqPacket.Instance, CancellationToken.None);

// Now we will wait longer than 1.5 so that the server will close the connection.

await Task.Delay(3000);

await server.StopAsync();

await client.ReceiveAsync(CancellationToken.None);
}
}
}
}

+ 0
- 2
Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs 查看文件

@@ -6,7 +6,6 @@ using MQTTnet.Tests.Mockups;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System.Threading;

namespace MQTTnet.Tests
{
@@ -63,7 +62,6 @@ namespace MQTTnet.Tests
var clientStatus = await server.GetClientStatusAsync();

Assert.AreEqual(1, clientStatus.Count);
Assert.IsTrue(clientStatus.Any(s => s.ClientId == c1.Options.ClientId));

await clientStatus.First().DisconnectAsync();


+ 3
- 67
Tests/MQTTnet.Core.Tests/Server_Tests.cs 查看文件

@@ -6,14 +6,12 @@ using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Subscribing;
using MQTTnet.Implementations;
using MQTTnet.Protocol;
using MQTTnet.Server;
using MQTTnet.Tests.Mockups;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -21,7 +19,7 @@ using System.Threading.Tasks;
namespace MQTTnet.Tests
{
[TestClass]
public class Server_Tests
public sealed class Server_Tests
{
public TestContext TestContext { get; set; }

@@ -331,7 +329,7 @@ namespace MQTTnet.Tests

await Task.Delay(10);
}
var c2 = await testEnvironment.ConnectClientAsync();

var messageBuilder = new MqttApplicationMessageBuilder();
@@ -1219,68 +1217,6 @@ namespace MQTTnet.Tests
}
}

[TestMethod]
public async Task Close_Idle_Connection()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));

var client = new CrossPlatformSocket(AddressFamily.InterNetwork);
await client.ConnectAsync("localhost", testEnvironment.ServerPort, CancellationToken.None);

// Don't send anything. The server should close the connection.
await Task.Delay(TimeSpan.FromSeconds(3));

try
{
var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
if (receivedBytes == 0)
{
return;
}

Assert.Fail("Receive should throw an exception.");
}
catch (SocketException)
{
}
}
}

[TestMethod]
public async Task Send_Garbage()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));

// Send an invalid packet and ensure that the server will close the connection and stay in a waiting state
// forever. This is security related.
var client = new CrossPlatformSocket(AddressFamily.InterNetwork);
await client.ConnectAsync("localhost", testEnvironment.ServerPort, CancellationToken.None);

var buffer = Encoding.UTF8.GetBytes("Garbage");
await client.SendAsync(new ArraySegment<byte>(buffer), SocketFlags.None);

await Task.Delay(TimeSpan.FromSeconds(3));

try
{
var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
if (receivedBytes == 0)
{
return;
}

Assert.Fail("Receive should throw an exception.");
}
catch (SocketException)
{
}
}
}

[TestMethod]
public async Task Do_Not_Send_Retained_Messages_For_Denied_Subscription()
{
@@ -1471,7 +1407,7 @@ namespace MQTTnet.Tests

await client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce);

await client.PublishAsync(new MqttApplicationMessage{ Topic = topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
await client.PublishAsync(new MqttApplicationMessage { Topic = topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });

await Task.Delay(500);



Loading…
取消
儲存