瀏覽代碼

Improve internal server connection management. (#1232)

release/3.x.x
Christian 3 年之前
committed by GitHub
父節點
當前提交
cd8940f9b7
沒有發現已知的金鑰在資料庫的簽署中 GPG Key ID: 4AEE18F83AFDEB23
共有 60 個文件被更改,包括 967 次插入576 次删除
  1. +4
    -0
      Build/MQTTnet.nuspec
  2. +3
    -3
      Source/MQTTnet/Client/MqttClient.cs
  3. +12
    -2
      Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs
  4. +4
    -3
      Source/MQTTnet/Client/Receiving/MqttApplicationMessageReceivedHandlerDelegate.cs
  5. +1
    -2
      Source/MQTTnet/Formatter/IMqttDataConverter.cs
  6. +1
    -0
      Source/MQTTnet/MqttTopicFilter.cs
  7. +2
    -0
      Source/MQTTnet/Protocol/MqttRetainHandling.cs
  8. +1
    -1
      Source/MQTTnet/Server/Internal/CheckSubscriptionsResult.cs
  9. +228
    -248
      Source/MQTTnet/Server/Internal/MqttClientConnection.cs
  10. +90
    -0
      Source/MQTTnet/Server/Internal/MqttClientConnectionStatistics.cs
  11. +5
    -5
      Source/MQTTnet/Server/Internal/MqttClientSession.cs
  12. +4
    -4
      Source/MQTTnet/Server/Internal/MqttClientSessionApplicationMessagesQueue.cs
  13. +118
    -101
      Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs
  14. +39
    -24
      Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs
  15. +4
    -4
      Source/MQTTnet/Server/Internal/MqttRetainedMessagesManager.cs
  16. +29
    -8
      Source/MQTTnet/Server/Internal/MqttServerEventDispatcher.cs
  17. +7
    -7
      Source/MQTTnet/Server/Internal/MqttServerKeepAliveMonitor.cs
  18. +1
    -1
      Source/MQTTnet/Server/Internal/MqttTopicFilterComparer.cs
  19. +4
    -12
      Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs
  20. +0
    -11
      Source/MQTTnet/Server/MqttClientConnectionStatus.cs
  21. +3
    -11
      Source/MQTTnet/Server/MqttClientMessageQueueInterceptorContext.cs
  22. +4
    -3
      Source/MQTTnet/Server/MqttClientMessageQueueInterceptorDelegate.cs
  23. +7
    -7
      Source/MQTTnet/Server/MqttConnectionValidatorContext.cs
  24. +3
    -1
      Source/MQTTnet/Server/MqttPendingApplicationMessage.cs
  25. +1
    -1
      Source/MQTTnet/Server/MqttQueuedApplicationMessage.cs
  26. +1
    -0
      Source/MQTTnet/Server/MqttServer.cs
  27. +2
    -1
      Source/MQTTnet/Server/MqttServerApplicationMessageInterceptorDelegate.cs
  28. +19
    -8
      Source/MQTTnet/Server/MqttServerClientConnectedEventArgs.cs
  29. +4
    -3
      Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs
  30. +4
    -11
      Source/MQTTnet/Server/MqttServerClientDisconnectedEventArgs.cs
  31. +4
    -3
      Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs
  32. +3
    -9
      Source/MQTTnet/Server/MqttServerClientSubscribedTopicEventArgs.cs
  33. +18
    -5
      Source/MQTTnet/Server/MqttServerClientSubscribedTopicHandlerDelegate.cs
  34. +3
    -9
      Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicEventArgs.cs
  35. +4
    -3
      Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicHandlerDelegate.cs
  36. +4
    -3
      Source/MQTTnet/Server/MqttServerConnectionValidatorDelegate.cs
  37. +1
    -1
      Source/MQTTnet/Server/MqttServerMultiThreadedApplicationMessageInterceptorDelegate.cs
  38. +1
    -0
      Source/MQTTnet/Server/MqttServerOptions.cs
  39. +4
    -3
      Source/MQTTnet/Server/MqttServerStartedHandlerDelegate.cs
  40. +4
    -3
      Source/MQTTnet/Server/MqttServerStoppedHandlerDelegate.cs
  41. +4
    -3
      Source/MQTTnet/Server/MqttServerSubscriptionInterceptorDelegate.cs
  42. +3
    -10
      Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs
  43. +4
    -11
      Source/MQTTnet/Server/MqttUnsubscriptionInterceptorContext.cs
  44. +0
    -9
      Source/MQTTnet/Server/PrepareClientSessionResult.cs
  45. +4
    -0
      Source/MQTTnet/Server/Status/IMqttClientStatus.cs
  46. +3
    -0
      Source/MQTTnet/Server/Status/IMqttSessionStatus.cs
  47. +6
    -3
      Source/MQTTnet/Server/Status/MqttClientStatus.cs
  48. +5
    -1
      Source/MQTTnet/Server/Status/MqttSessionStatus.cs
  49. +1
    -0
      Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs
  50. +22
    -0
      Tests/MQTTnet.Core.Tests/BaseTestClass.cs
  51. +2
    -3
      Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs
  52. +9
    -5
      Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs
  53. +1
    -1
      Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs
  54. +1
    -0
      Tests/MQTTnet.Core.Tests/MqttSubscriptionsManager_Tests.cs
  55. +186
    -0
      Tests/MQTTnet.Core.Tests/Server_Events_Tests.cs
  56. +2
    -4
      Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs
  57. +42
    -5
      Tests/MQTTnet.Core.Tests/Server_Tests.cs
  58. +1
    -0
      Tests/MQTTnet.Core.Tests/TopicFilterComparer_Tests.cs
  59. +5
    -0
      Tests/MQTTnet.TestApp.NetCore/Program.cs
  60. +15
    -0
      Tests/MQTTnet.TestApp.NetCore/ServerTest.cs

+ 4
- 0
Build/MQTTnet.nuspec 查看文件

@@ -13,6 +13,10 @@
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol.</description>
<releaseNotes>
* [ManagedClient] Extended ReconnectAsync (thanks to @nvsnkv, #1202).
* [Server] Fixed a memory/performance leak when using QoS Level 1.
* [Server] Exposed connection timestamp in client status.
* [Server] Refactored connection management code.
* [Server] Exposed more details in _MqttServerClientConnectedEventArgs_.
* [MQTTnet.Server] Moved server project to a dedicated GitHub repository.
* [MQTTnet, MQTTnet.Extensions.ManagedClient] Fixed bug that allowed invalid subscriptions (Thanks to @marcelwinh).
Git commit: $gitCommit


+ 3
- 3
Source/MQTTnet/Client/MqttClient.cs 查看文件

@@ -363,9 +363,9 @@ namespace MQTTnet.Client

await Task.WhenAll(receiverTask, publishPacketReceiverTask, keepAliveTask).ConfigureAwait(false);
}
catch (Exception e)
catch (Exception innerException)
{
_logger.Warning(e, "Error while waiting for internal tasks.");
_logger.Warning(innerException, "Error while waiting for internal tasks.");
}
finally
{
@@ -439,7 +439,7 @@ namespace MQTTnet.Client
{
if (exception is MqttCommunicationTimedOutException)
{
_logger.Warning(null, "Timeout while waiting for response packet ({0}).", typeof(TResponsePacket).Name);
_logger.Warning("Timeout while waiting for response packet ({0}).", typeof(TResponsePacket).Name);
}

throw;


+ 12
- 2
Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs 查看文件

@@ -133,7 +133,7 @@ namespace MQTTnet.Client.Options
return this;
}

public MqttClientOptionsBuilder WithCredentials(string username, string password = null)
public MqttClientOptionsBuilder WithCredentials(string username, string password)
{
byte[] passwordBuffer = null;

@@ -145,7 +145,7 @@ namespace MQTTnet.Client.Options
return WithCredentials(username, passwordBuffer);
}

public MqttClientOptionsBuilder WithCredentials(string username, byte[] password = null)
public MqttClientOptionsBuilder WithCredentials(string username, byte[] password)
{
_options.Credentials = new MqttClientCredentials
{
@@ -155,6 +155,16 @@ namespace MQTTnet.Client.Options

return this;
}
public MqttClientOptionsBuilder WithCredentials(string username)
{
_options.Credentials = new MqttClientCredentials
{
Username = username
};

return this;
}

public MqttClientOptionsBuilder WithCredentials(IMqttClientCredentials credentials)
{


+ 4
- 3
Source/MQTTnet/Client/Receiving/MqttApplicationMessageReceivedHandlerDelegate.cs 查看文件

@@ -1,11 +1,12 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Implementations;

namespace MQTTnet.Client.Receiving
{
public class MqttApplicationMessageReceivedHandlerDelegate : IMqttApplicationMessageReceivedHandler
public sealed class MqttApplicationMessageReceivedHandlerDelegate : IMqttApplicationMessageReceivedHandler
{
private readonly Func<MqttApplicationMessageReceivedEventArgs, Task> _handler;
readonly Func<MqttApplicationMessageReceivedEventArgs, Task> _handler;

public MqttApplicationMessageReceivedHandlerDelegate(Action<MqttApplicationMessageReceivedEventArgs> handler)
{
@@ -14,7 +15,7 @@ namespace MQTTnet.Client.Receiving
_handler = context =>
{
handler(context);
return Task.FromResult(0);
return PlatformAbstractionLayer.CompletedTask;
};
}



+ 1
- 2
Source/MQTTnet/Formatter/IMqttDataConverter.cs 查看文件

@@ -8,7 +8,6 @@ using MQTTnet.Client.Unsubscribing;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
using MqttClientSubscribeResult = MQTTnet.Client.Subscribing.MqttClientSubscribeResult;

namespace MQTTnet.Formatter
{
@@ -22,7 +21,7 @@ namespace MQTTnet.Formatter

MqttClientPublishResult CreateClientPublishResult(MqttPubRecPacket pubRecPacket, MqttPubCompPacket pubCompPacket);
MqttClientSubscribeResult CreateClientSubscribeResult(MqttSubscribePacket subscribePacket, MqttSubAckPacket subAckPacket);
Client.Subscribing.MqttClientSubscribeResult CreateClientSubscribeResult(MqttSubscribePacket subscribePacket, MqttSubAckPacket subAckPacket);
MqttClientUnsubscribeResult CreateClientUnsubscribeResult(MqttUnsubscribePacket unsubscribePacket, MqttUnsubAckPacket unsubAckPacket);



+ 1
- 0
Source/MQTTnet/MqttTopicFilter.cs 查看文件

@@ -8,6 +8,7 @@ namespace MQTTnet
{
}

// TODO: Consider using struct instead.
public class MqttTopicFilter
{
/// <summary>


+ 2
- 0
Source/MQTTnet/Protocol/MqttRetainHandling.cs 查看文件

@@ -3,7 +3,9 @@
public enum MqttRetainHandling
{
SendAtSubscribe = 0,
SendAtSubscribeIfNewSubscriptionOnly = 1,
DoNotSendOnSubscribe = 2
}
}

Source/MQTTnet/Server/CheckSubscriptionsResult.cs → Source/MQTTnet/Server/Internal/CheckSubscriptionsResult.cs 查看文件

@@ -1,6 +1,6 @@
using MQTTnet.Protocol;

namespace MQTTnet.Server
namespace MQTTnet.Server.Internal
{
public struct CheckSubscriptionsResult
{

Source/MQTTnet/Server/MqttClientConnection.cs → Source/MQTTnet/Server/Internal/MqttClientConnection.cs 查看文件

@@ -1,94 +1,82 @@
using MQTTnet.Adapter;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Client;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Formatter;
using MQTTnet.Implementations;
using MQTTnet.Internal;
using MQTTnet.PacketDispatcher;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server.Status;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client.Disconnecting;

namespace MQTTnet.Server
namespace MQTTnet.Server.Internal
{
public sealed class MqttClientConnection : IDisposable
{
readonly Dictionary<ushort, string> _topicAlias = new Dictionary<ushort, string>();
readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();

readonly IMqttRetainedMessagesManager _retainedMessagesManager;
readonly MqttClientSessionsManager _sessionsManager;

readonly IMqttChannelAdapter _channelAdapter;
readonly IMqttNetScopedLogger _logger;
readonly IMqttServerOptions _serverOptions;

readonly IMqttChannelAdapter _channelAdapter;
readonly MqttConnectionValidatorContext _connectionValidatorContext;
readonly IMqttDataConverter _dataConverter;
readonly string _endpoint;
readonly DateTime _connectedTimestamp;

volatile Task _packageReceiverTask;
DateTime _lastNonKeepAlivePacketReceivedTimestamp;

long _receivedPacketsCount;
long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere.
long _receivedApplicationMessagesCount;
long _sentApplicationMessagesCount;
MqttClientDisconnectReason _disconnectReason;
CancellationTokenSource _cancellationToken;

public MqttClientConnection(MqttConnectPacket connectPacket,
public MqttClientConnection(
MqttConnectPacket connectPacket,
IMqttChannelAdapter channelAdapter,
MqttClientSession session,
MqttConnectionValidatorContext connectionValidatorContext,
IMqttServerOptions serverOptions,
MqttClientSessionsManager sessionsManager,
IMqttRetainedMessagesManager retainedMessagesManager,
IMqttNetLogger logger)
{
Session = session ?? throw new ArgumentNullException(nameof(session));
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));

_channelAdapter = channelAdapter ?? throw new ArgumentNullException(nameof(channelAdapter));
_connectionValidatorContext = connectionValidatorContext ?? throw new ArgumentNullException(nameof(connectionValidatorContext));
_dataConverter = _channelAdapter.PacketFormatterAdapter.DataConverter;
_endpoint = _channelAdapter.Endpoint;
_endpoint = channelAdapter.Endpoint;
Session = session ?? throw new ArgumentNullException(nameof(session));
ConnectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket));

if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateScopedLogger(nameof(MqttClientConnection));

_connectedTimestamp = DateTime.UtcNow;
LastPacketReceivedTimestamp = _connectedTimestamp;
_lastNonKeepAlivePacketReceivedTimestamp = LastPacketReceivedTimestamp;
}

public MqttClientConnectionStatus Status { get; private set; } = MqttClientConnectionStatus.Initializing;
public string ClientId => ConnectPacket.ClientId;

public MqttConnectPacket ConnectPacket { get; }
public string Endpoint => _endpoint;

public string ClientId => ConnectPacket.ClientId;
public MqttClientConnectionStatistics Statistics { get; } = new MqttClientConnectionStatistics();

public bool IsReadingPacket => _channelAdapter.IsReadingPacket;
public bool IsRunning { get; private set; }

public MqttConnectPacket ConnectPacket { get; }

public DateTime LastPacketReceivedTimestamp { get; private set; }
public bool IsReadingPacket => _channelAdapter.IsReadingPacket;

public MqttClientSession Session { get; }

public bool IsTakenOver { get; set; }

public bool IsCleanDisconnect { get; private set; }

public async Task StopAsync(MqttClientDisconnectReason reason)
{
Status = MqttClientConnectionStatus.Finalizing;
_disconnectReason = reason;
IsRunning = false;

if (reason == MqttClientDisconnectReason.SessionTakenOver || reason == MqttClientDisconnectReason.KeepAliveTimeout)
{
@@ -96,30 +84,10 @@ namespace MQTTnet.Server
// token because the entire connection is closed (disposed) as soon as the cancellation
// token is cancelled. To there is no chance that the DISCONNECT packet will ever arrive
// at the client!
try
{
var disconnectOptions = new MqttClientDisconnectOptions
{
ReasonCode = reason,
ReasonString = reason.ToString()
};
var disconnectPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreateDisconnectPacket(disconnectOptions);

using (var timeout = new CancellationTokenSource(_serverOptions.DefaultCommunicationTimeout))
{
await _channelAdapter.SendPacketAsync(disconnectPacket, timeout.Token).ConfigureAwait(false);
}
}
catch (Exception exception)
{
_logger.Warning(exception, "Client '{0}': Error while sending DISCONNECT packet after takeover.", ClientId);
}
await TrySendDisconnectPacket(reason).ConfigureAwait(false);
}

StopInternal();

await (_packageReceiverTask ?? PlatformAbstractionLayer.CompletedTask);
}

public void ResetStatistics()
@@ -127,24 +95,16 @@ namespace MQTTnet.Server
_channelAdapter.ResetStatistics();
}

public void FillStatus(MqttClientStatus status)
public void FillClientStatus(MqttClientStatus clientStatus)
{
status.ClientId = ClientId;
status.Endpoint = _endpoint;
status.ProtocolVersion = _channelAdapter.PacketFormatterAdapter.ProtocolVersion;
clientStatus.ClientId = ClientId;
clientStatus.Endpoint = _endpoint;

status.ReceivedApplicationMessagesCount = Interlocked.Read(ref _receivedApplicationMessagesCount);
status.SentApplicationMessagesCount = Interlocked.Read(ref _sentApplicationMessagesCount);
clientStatus.ProtocolVersion = _channelAdapter.PacketFormatterAdapter.ProtocolVersion;
clientStatus.BytesSent = _channelAdapter.BytesSent;
clientStatus.BytesReceived = _channelAdapter.BytesReceived;

status.ReceivedPacketsCount = Interlocked.Read(ref _receivedPacketsCount);
status.SentPacketsCount = Interlocked.Read(ref _sentPacketsCount);

status.ConnectedTimestamp = _connectedTimestamp;
status.LastPacketReceivedTimestamp = LastPacketReceivedTimestamp;
status.LastNonKeepAlivePacketReceivedTimestamp = _lastNonKeepAlivePacketReceivedTimestamp;

status.BytesSent = _channelAdapter.BytesSent;
status.BytesReceived = _channelAdapter.BytesReceived;
Statistics.FillClientStatus(clientStatus);
}

public void Dispose()
@@ -152,31 +112,60 @@ namespace MQTTnet.Server
_cancellationToken.Dispose();
}

public Task RunAsync()
public async Task RunAsync()
{
_packageReceiverTask = RunInternalAsync(_cancellationToken.Token);
return _packageReceiverTask;
}
_logger.Info("Client '{0}': Session started.", ClientId);

async Task RunInternalAsync(CancellationToken cancellationToken)
{
var disconnectType = MqttClientDisconnectType.NotClean;
try
Session.WillMessage = ConnectPacket.WillMessage;
using (var cancellationToken = new CancellationTokenSource())
{
_logger.Info("Client '{0}': Session started.", ClientId);
_cancellationToken = cancellationToken;

try
{
Task.Run(() => SendPacketsLoop(cancellationToken.Token), cancellationToken.Token).RunInBackground(_logger);

Session.WillMessage = ConnectPacket.WillMessage;
Session.IsCleanSession = false;

IsRunning = true;

await ReceivePackagesLoop(cancellationToken.Token).ConfigureAwait(false);
}
finally
{
IsRunning = false;
cancellationToken.Cancel();
_cancellationToken = null;
}
}
_packetDispatcher.CancelAll();

await SendAsync(_channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(_connectionValidatorContext), cancellationToken).ConfigureAwait(false);
if (!IsTakenOver && !IsCleanDisconnect && Session.WillMessage != null)
{
_sessionsManager.DispatchApplicationMessage(Session.WillMessage, this);
Session.WillMessage = null;
}

Task.Run(() => SendPendingPacketsAsync(cancellationToken), cancellationToken).RunInBackground(_logger);
_logger.Info("Client '{0}': Connection stopped.", ClientId);
}

Session.IsCleanSession = false;
Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{
return _channelAdapter.SendPacketAsync(packet, cancellationToken).ContinueWith(task => { Statistics.HandleSentPacket(packet); }, cancellationToken);
}

async Task ReceivePackagesLoop(CancellationToken cancellationToken)
{
try
{
// We do not listen for the cancellation token here because the internal buffer might still
// contain data to be read even if the TCP connection was already dropped. So we rely on an
// own exception in the reading loop!
while (!cancellationToken.IsCancellationRequested)
{
Status = MqttClientConnectionStatus.Running;

var packet = await _channelAdapter.ReceivePacketAsync(cancellationToken).ConfigureAwait(false);
if (packet == null)
{
@@ -184,13 +173,7 @@ namespace MQTTnet.Server
return;
}

Interlocked.Increment(ref _sentPacketsCount);
LastPacketReceivedTimestamp = DateTime.UtcNow;

if (!(packet is MqttPingReqPacket || packet is MqttPingRespPacket))
{
_lastNonKeepAlivePacketReceivedTimestamp = LastPacketReceivedTimestamp;
}
Statistics.HandleReceivedPacket(packet);

if (packet is MqttPublishPacket publishPacket)
{
@@ -210,7 +193,8 @@ namespace MQTTnet.Server
}
else if (packet is MqttPingReqPacket)
{
await SendAsync(MqttPingRespPacket.Instance, cancellationToken).ConfigureAwait(false);
// See: The Server MUST send a PINGRESP packet in response to a PINGREQ packet [MQTT-3.12.4-1].
await SendPacketAsync(MqttPingRespPacket.Instance, cancellationToken).ConfigureAwait(false);
}
else if (packet is MqttPingRespPacket)
{
@@ -218,10 +202,7 @@ namespace MQTTnet.Server
}
else if (packet is MqttDisconnectPacket)
{
Session.WillMessage = null;
disconnectType = MqttClientDisconnectType.Clean;

StopInternal();
IsCleanDisconnect = true;
return;
}
else
@@ -246,40 +227,111 @@ namespace MQTTnet.Server
{
_logger.Error(exception, "Client '{0}': Error while receiving client packets.", ClientId);
}

StopInternal();
}
finally
}

async Task SendPacketsLoop(CancellationToken cancellationToken)
{
MqttQueuedApplicationMessage queuedApplicationMessage = null;
MqttPublishPacket publishPacket = null;

try
{
if (_disconnectReason == MqttClientDisconnectReason.SessionTakenOver)
while (!cancellationToken.IsCancellationRequested)
{
disconnectType = MqttClientDisconnectType.Takeover;
}
queuedApplicationMessage = await Session.ApplicationMessagesQueue.DequeueAsync(cancellationToken).ConfigureAwait(false);
if (queuedApplicationMessage == null)
{
return;
}

if (Session.WillMessage != null)
{
_sessionsManager.DispatchApplicationMessage(Session.WillMessage, this);
Session.WillMessage = null;
}
if (cancellationToken.IsCancellationRequested)
{
return;
}

_packetDispatcher.CancelAll();
publishPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreatePublishPacket(queuedApplicationMessage.ApplicationMessage);
publishPacket.QualityOfServiceLevel = queuedApplicationMessage.SubscriptionQualityOfServiceLevel;

_logger.Info("Client '{0}': Connection stopped.", ClientId);
// Set the retain flag to true according to [MQTT-3.3.1-8] and [MQTT-3.3.1-9].
publishPacket.Retain = queuedApplicationMessage.IsRetainedMessage;

try
publishPacket = await InvokeClientMessageQueueInterceptor(publishPacket, queuedApplicationMessage).ConfigureAwait(false);
if (publishPacket == null)
{
// The interceptor has decided that the message is not relevant and will be fully ignored.
continue;
}

if (publishPacket.QualityOfServiceLevel > 0)
{
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier();
}

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
await SendPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
using (var awaiter = _packetDispatcher.AddAwaiter<MqttPubAckPacket>(publishPacket.PacketIdentifier))
{
await SendPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false);
await awaiter.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false);
}
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
using (var awaiter1 = _packetDispatcher.AddAwaiter<MqttPubRecPacket>(publishPacket.PacketIdentifier))
using (var awaiter2 = _packetDispatcher.AddAwaiter<MqttPubCompPacket>(publishPacket.PacketIdentifier))
{
await SendPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false);
var pubRecPacket = await awaiter1.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false);

var pubRelPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreatePubRelPacket(pubRecPacket, MqttApplicationMessageReceivedReasonCode.Success);
await SendPacketAsync(pubRelPacket, cancellationToken).ConfigureAwait(false);

await awaiter2.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false);
}
}

_logger.Verbose("Client '{0}': Queued application message sent.", ClientId);
}
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
if (exception is MqttCommunicationTimedOutException)
{
_logger.Warning(exception, "Client '{0}': Sending publish packet failed: Timeout.", ClientId);
}
else if (exception is MqttCommunicationException)
{
await _sessionsManager.CleanUpClient(ClientId, _channelAdapter, disconnectType);
_logger.Warning(exception, "Client '{0}': Sending publish packet failed: Communication exception.", ClientId);
}
catch (Exception e)
else
{
_logger.Error(e, "Client '{0}': Error while cleaning up", ClientId);
_logger.Error(exception, "Client '{0}': Sending publish packet failed.", ClientId);
}

if (publishPacket?.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
if (queuedApplicationMessage != null)
{
queuedApplicationMessage.IsDuplicate = true;
Session.ApplicationMessagesQueue.Enqueue(queuedApplicationMessage);
}
}

StopInternal();
}
}

void StopInternal()
{
_cancellationToken.Cancel();
_cancellationToken?.Cancel();
}

async Task EnqueueSubscribedRetainedMessagesAsync(ICollection<MqttTopicFilter> topicFilters)
@@ -294,7 +346,7 @@ namespace MQTTnet.Server
Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken)
{
var pubCompPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreatePubCompPacket(pubRelPacket, MqttApplicationMessageReceivedReasonCode.Success);
return SendAsync(pubCompPacket, cancellationToken);
return SendPacketAsync(pubCompPacket, cancellationToken);
}

async Task HandleIncomingSubscribePacketAsync(MqttSubscribePacket subscribePacket, CancellationToken cancellationToken)
@@ -302,7 +354,7 @@ namespace MQTTnet.Server
var subscribeResult = await Session.SubscriptionsManager.SubscribeAsync(subscribePacket, ConnectPacket).ConfigureAwait(false);
var subAckPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreateSubAckPacket(subscribePacket, subscribeResult);

await SendAsync(subAckPacket, cancellationToken).ConfigureAwait(false);
await SendPacketAsync(subAckPacket, cancellationToken).ConfigureAwait(false);

if (subscribeResult.CloseConnection)
{
@@ -318,38 +370,36 @@ namespace MQTTnet.Server
var reasonCodes = await Session.SubscriptionsManager.UnsubscribeAsync(unsubscribePacket).ConfigureAwait(false);
var unsubAckPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreateUnsubAckPacket(unsubscribePacket, reasonCodes);

await SendAsync(unsubAckPacket, cancellationToken).ConfigureAwait(false);
await SendPacketAsync(unsubAckPacket, cancellationToken).ConfigureAwait(false);
}

Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
{
Interlocked.Increment(ref _sentApplicationMessagesCount);

HandleTopicAlias(publishPacket);

var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket);
var applicationMessage = _channelAdapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket);
_sessionsManager.DispatchApplicationMessage(applicationMessage, this);

switch (publishPacket.QualityOfServiceLevel)
{
case MqttQualityOfServiceLevel.AtMostOnce:
{
return PlatformAbstractionLayer.CompletedTask;
}
{
return PlatformAbstractionLayer.CompletedTask;
}
case MqttQualityOfServiceLevel.AtLeastOnce:
{
var pubAckPacket = _dataConverter.CreatePubAckPacket(publishPacket, MqttApplicationMessageReceivedReasonCode.Success);
return SendAsync(pubAckPacket, cancellationToken);
}
{
var pubAckPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreatePubAckPacket(publishPacket, MqttApplicationMessageReceivedReasonCode.Success);
return SendPacketAsync(pubAckPacket, cancellationToken);
}
case MqttQualityOfServiceLevel.ExactlyOnce:
{
var pubRecPacket = _dataConverter.CreatePubRecPacket(publishPacket, MqttApplicationMessageReceivedReasonCode.Success);
return SendAsync(pubRecPacket, cancellationToken);
}
{
var pubRecPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreatePubRecPacket(publishPacket, MqttApplicationMessageReceivedReasonCode.Success);
return SendPacketAsync(pubRecPacket, cancellationToken);
}
default:
{
throw new MqttCommunicationException("Received a not supported QoS level.");
}
{
throw new MqttCommunicationException("Received a not supported QoS level.");
}
}
}

@@ -376,134 +426,64 @@ namespace MQTTnet.Server
}
else
{

}
}
}
}
async Task SendPendingPacketsAsync(CancellationToken cancellationToken)
async Task TrySendDisconnectPacket(MqttClientDisconnectReason reason)
{
MqttQueuedApplicationMessage queuedApplicationMessage = null;
MqttPublishPacket publishPacket = null;

try
{
while (!cancellationToken.IsCancellationRequested)
var disconnectOptions = new MqttClientDisconnectOptions
{
queuedApplicationMessage = await Session.ApplicationMessagesQueue.DequeueAsync(cancellationToken).ConfigureAwait(false);
if (queuedApplicationMessage == null)
{
return;
}

if (cancellationToken.IsCancellationRequested)
{
return;
}

publishPacket = _dataConverter.CreatePublishPacket(queuedApplicationMessage.ApplicationMessage);
publishPacket.QualityOfServiceLevel = queuedApplicationMessage.SubscriptionQualityOfServiceLevel;

// Set the retain flag to true according to [MQTT-3.3.1-8] and [MQTT-3.3.1-9].
publishPacket.Retain = queuedApplicationMessage.IsRetainedMessage;

if (_serverOptions.ClientMessageQueueInterceptor != null)
{
var context = new MqttClientMessageQueueInterceptorContext(
queuedApplicationMessage.SenderClientId,
ClientId,
queuedApplicationMessage.ApplicationMessage,
queuedApplicationMessage.SubscriptionQualityOfServiceLevel);

if (_serverOptions.ClientMessageQueueInterceptor != null)
{
await _serverOptions.ClientMessageQueueInterceptor.InterceptClientMessageQueueEnqueueAsync(context).ConfigureAwait(false);
}

if (!context.AcceptEnqueue || context.ApplicationMessage == null)
{
return;
}
ReasonCode = reason,
ReasonString = reason.ToString()
};

publishPacket.Topic = context.ApplicationMessage.Topic;
publishPacket.Payload = context.ApplicationMessage.Payload;
publishPacket.QualityOfServiceLevel = context.SubscriptionQualityOfServiceLevel;
}

if (publishPacket.QualityOfServiceLevel > 0)
{
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier();
}
var disconnectPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreateDisconnectPacket(disconnectOptions);

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
await SendAsync(publishPacket, cancellationToken).ConfigureAwait(false);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
var awaiter = _packetDispatcher.AddAwaiter<MqttPubAckPacket>(publishPacket.PacketIdentifier);
await SendAsync(publishPacket, cancellationToken).ConfigureAwait(false);
await awaiter.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
using (var awaiter1 = _packetDispatcher.AddAwaiter<MqttPubRecPacket>(publishPacket.PacketIdentifier))
using (var awaiter2 = _packetDispatcher.AddAwaiter<MqttPubCompPacket>(publishPacket.PacketIdentifier))
{
await SendAsync(publishPacket, cancellationToken).ConfigureAwait(false);
var pubRecPacket = await awaiter1.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false);

var pubRelPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreatePubRelPacket(pubRecPacket, MqttApplicationMessageReceivedReasonCode.Success);
await SendAsync(pubRelPacket, cancellationToken).ConfigureAwait(false);

await awaiter2.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false);
}
}

_logger.Verbose("Client '{0}': Queued application message sent.", ClientId);
using (var timeout = new CancellationTokenSource(_serverOptions.DefaultCommunicationTimeout))
{
await SendPacketAsync(disconnectPacket, timeout.Token).ConfigureAwait(false);
}
}
catch (Exception exception)
{
if (exception is OperationCanceledException)
{
}
else if (exception is MqttCommunicationTimedOutException)
{
_logger.Warning(exception, "Client '{0}': Sending publish packet failed: Timeout.", ClientId);
}
else if (exception is MqttCommunicationException)
{
_logger.Warning(exception, "Client '{0}': Sending publish packet failed: Communication exception.", ClientId);
}
else
{
_logger.Error(exception, "Client '{0}': Sending publish packet failed.", ClientId);
}
_logger.Warning(exception, "Client '{{0}}': Error while sending DISCONNECT packet (Reason = {1}).", ClientId, reason);
}
}

if (publishPacket?.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
queuedApplicationMessage.IsDuplicate = true;
async Task<MqttPublishPacket> InvokeClientMessageQueueInterceptor(MqttPublishPacket publishPacket, MqttQueuedApplicationMessage queuedApplicationMessage)
{
if (_serverOptions.ClientMessageQueueInterceptor == null)
{
return publishPacket;
}

Session.ApplicationMessagesQueue.Enqueue(queuedApplicationMessage);
}
var context = new MqttClientMessageQueueInterceptorContext
{
SenderClientId = queuedApplicationMessage.SenderClientId,
ReceiverClientId = ClientId,
ApplicationMessage = queuedApplicationMessage.ApplicationMessage,
SubscriptionQualityOfServiceLevel = queuedApplicationMessage.SubscriptionQualityOfServiceLevel
};

StopInternal();
if (_serverOptions.ClientMessageQueueInterceptor != null)
{
await _serverOptions.ClientMessageQueueInterceptor.InterceptClientMessageQueueEnqueueAsync(context).ConfigureAwait(false);
}
}

Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{
return _channelAdapter.SendPacketAsync(packet, cancellationToken).ContinueWith(task =>
if (!context.AcceptEnqueue || context.ApplicationMessage == null)
{
Interlocked.Increment(ref _receivedPacketsCount);
return null;
}

if (packet is MqttPublishPacket)
{
Interlocked.Increment(ref _receivedApplicationMessagesCount);
}
}, cancellationToken);
publishPacket.Topic = context.ApplicationMessage.Topic;
publishPacket.Payload = context.ApplicationMessage.Payload;
publishPacket.QualityOfServiceLevel = context.SubscriptionQualityOfServiceLevel;
return publishPacket;
}
}
}
}

+ 90
- 0
Source/MQTTnet/Server/Internal/MqttClientConnectionStatistics.cs 查看文件

@@ -0,0 +1,90 @@
using System;
using System.Threading;
using MQTTnet.Packets;
using MQTTnet.Server.Status;

namespace MQTTnet.Server.Internal
{
public sealed class MqttClientConnectionStatistics
{
readonly DateTime _connectedTimestamp;

DateTime _lastNonKeepAlivePacketReceivedTimestamp;
DateTime _lastPacketReceivedTimestamp;
DateTime _lastPacketSentTimestamp;

// Start with 1 because the CONNACK packet is not counted here.
long _receivedPacketsCount = 1;

// Start with 1 because the CONNECT packet is not counted here.
long _sentPacketsCount = 1;

long _receivedApplicationMessagesCount;
long _sentApplicationMessagesCount;

public MqttClientConnectionStatistics()
{
_connectedTimestamp = DateTime.UtcNow;

_lastPacketReceivedTimestamp = _connectedTimestamp;
_lastPacketSentTimestamp = _connectedTimestamp;

_lastNonKeepAlivePacketReceivedTimestamp = _connectedTimestamp;
}

public DateTime LastPacketReceivedTimestamp => _lastPacketReceivedTimestamp;

public void HandleReceivedPacket(MqttBasePacket packet)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));
// This class is tracking all values from Clients perspective!
_lastPacketSentTimestamp = DateTime.UtcNow;

Interlocked.Increment(ref _sentPacketsCount);

if (packet is MqttPublishPacket)
{
Interlocked.Increment(ref _sentApplicationMessagesCount);
}

if (!(packet is MqttPingReqPacket || packet is MqttPingRespPacket))
{
_lastNonKeepAlivePacketReceivedTimestamp = _lastPacketReceivedTimestamp;
}
}

public void HandleSentPacket(MqttBasePacket packet)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));
// This class is tracking all values from Clients perspective!
_lastPacketReceivedTimestamp = DateTime.UtcNow;

Interlocked.Increment(ref _receivedPacketsCount);

if (packet is MqttPublishPacket)
{
Interlocked.Increment(ref _receivedApplicationMessagesCount);
}
}

public void FillClientStatus(MqttClientStatus clientStatus)
{
if (clientStatus == null) throw new ArgumentNullException(nameof(clientStatus));
clientStatus.ConnectedTimestamp = _connectedTimestamp;

clientStatus.ReceivedPacketsCount = Interlocked.Read(ref _receivedPacketsCount);
clientStatus.SentPacketsCount = Interlocked.Read(ref _sentPacketsCount);

clientStatus.ReceivedApplicationMessagesCount = Interlocked.Read(ref _receivedApplicationMessagesCount);
clientStatus.SentApplicationMessagesCount = Interlocked.Read(ref _sentApplicationMessagesCount);

clientStatus.LastPacketReceivedTimestamp = _lastPacketReceivedTimestamp;
clientStatus.LastPacketSentTimestamp = _lastPacketSentTimestamp;

clientStatus.LastNonKeepAlivePacketReceivedTimestamp = _lastNonKeepAlivePacketReceivedTimestamp;
}
}
}

Source/MQTTnet/Server/MqttClientSession.cs → Source/MQTTnet/Server/Internal/MqttClientSession.cs 查看文件

@@ -1,10 +1,10 @@
using MQTTnet.Diagnostics;
using MQTTnet.Server.Status;
using System;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;
using MQTTnet.Server.Status;

namespace MQTTnet.Server
namespace MQTTnet.Server.Internal
{
public sealed class MqttClientSession
{
@@ -81,7 +81,7 @@ namespace MQTTnet.Server
return SubscriptionsManager.UnsubscribeAsync(topicFilters);
}

public void FillStatus(MqttSessionStatus status)
public void FillSessionStatus(MqttSessionStatus status)
{
status.ClientId = ClientId;
status.CreatedTimestamp = _createdTimestamp;

Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs → Source/MQTTnet/Server/Internal/MqttClientSessionApplicationMessagesQueue.cs 查看文件

@@ -1,10 +1,10 @@
using MQTTnet.Internal;
using MQTTnet.Protocol;
using System;
using System;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Internal;
using MQTTnet.Protocol;

namespace MQTTnet.Server
namespace MQTTnet.Server.Internal
{
public sealed class MqttClientSessionApplicationMessagesQueue : IDisposable
{

Source/MQTTnet/Server/MqttClientSessionsManager.cs → Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs 查看文件

@@ -1,4 +1,11 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Formatter;
@@ -6,15 +13,8 @@ using MQTTnet.Internal;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server.Status;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client.Disconnecting;

namespace MQTTnet.Server
namespace MQTTnet.Server.Internal
{
public sealed class MqttClientSessionsManager : IDisposable
{
@@ -53,53 +53,66 @@ namespace MQTTnet.Server
Task.Run(() => TryProcessQueuedApplicationMessagesAsync(cancellationToken), cancellationToken).RunInBackground(_logger);
}

public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
async Task<MqttConnectPacket> ReceiveConnectPacket(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
{
try
{
MqttConnectPacket connectPacket;
try
using (var timeoutToken = new CancellationTokenSource(_options.DefaultCommunicationTimeout))
using (var effectiveCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(timeoutToken.Token, cancellationToken))
{
using (var timeoutToken = new CancellationTokenSource(_options.DefaultCommunicationTimeout))
{
var firstPacket = await channelAdapter.ReceivePacketAsync(timeoutToken.Token).ConfigureAwait(false);
connectPacket = firstPacket as MqttConnectPacket;
if (connectPacket == null)
{
_logger.Warning(null,
"Client '{0}': First received packet was no 'CONNECT' packet [MQTT-3.1.0-1].",
channelAdapter.Endpoint);
var firstPacket = await channelAdapter.ReceivePacketAsync(effectiveCancellationToken.Token).ConfigureAwait(false);

return;
}
if (firstPacket is MqttConnectPacket connectPacket)
{
return connectPacket;
}
}
catch (OperationCanceledException)
{
_logger.Warning(null, "Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.Endpoint);
return;
}
catch (MqttCommunicationTimedOutException)
}
catch (OperationCanceledException)
{
_logger.Warning("Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.Endpoint);
}
catch (MqttCommunicationTimedOutException)
{
_logger.Warning("Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.Endpoint);
}

_logger.Warning("Client '{0}': First received packet was no 'CONNECT' packet [MQTT-3.1.0-1].", channelAdapter.Endpoint);
return null;
}
public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
{
MqttClientConnection clientConnection = null;

try
{
var connectPacket = await ReceiveConnectPacket(channelAdapter, cancellationToken).ConfigureAwait(false);
if (connectPacket == null)
{
_logger.Warning(null, "Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.Endpoint);
// Nothing was received in time etc.
return;
}

var connectionValidatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false);

MqttConnAckPacket connAckPacket;
var connectionValidatorContext = await ValidateConnection(connectPacket, channelAdapter).ConfigureAwait(false);
if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success)
{
// Send failure response here without preparing a session. The result for a successful connect
// will be sent from the session itself.
var connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext);
// Send failure response here without preparing a session!
connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext);
await channelAdapter.SendPacketAsync(connAckPacket, cancellationToken).ConfigureAwait(false);

return;
}

var connection = await CreateClientConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false);
await _eventDispatcher.SafeNotifyClientConnectedAsync(connectPacket.ClientId).ConfigureAwait(false);
await connection.RunAsync().ConfigureAwait(false);
clientConnection = await CreateClientConnection(connectPacket, channelAdapter, connectionValidatorContext.SessionItems).ConfigureAwait(false);

connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext);
await channelAdapter.SendPacketAsync(connAckPacket, cancellationToken).ConfigureAwait(false);

await _eventDispatcher.SafeNotifyClientConnectedAsync(connectPacket, channelAdapter).ConfigureAwait(false);

await clientConnection.RunAsync().ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -108,6 +121,41 @@ namespace MQTTnet.Server
{
_logger.Error(exception, exception.Message);
}
finally
{
if (clientConnection != null)
{
if (clientConnection.ClientId != null)
{
// in case it is a takeover _connections already contains the new connection
if (!clientConnection.IsTakenOver)
{
lock (_connections)
{
_connections.Remove(clientConnection.ClientId);
}

if (!_options.EnablePersistentSessions)
{
await DeleteSessionAsync(clientConnection.ClientId).ConfigureAwait(false);
}
}
}

var endpoint = clientConnection.Endpoint;

if (clientConnection.ClientId != null && !clientConnection.IsTakenOver)
{
// The event is fired at a separate place in case of a handover!
await _eventDispatcher.SafeNotifyClientDisconnectedAsync(
clientConnection.ClientId,
clientConnection.IsCleanDisconnect ? MqttClientDisconnectType.Clean : MqttClientDisconnectType.NotClean,
endpoint).ConfigureAwait(false);
}
}

await channelAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
}
}

public async Task CloseAllConnectionsAsync()
@@ -132,7 +180,7 @@ namespace MQTTnet.Server
return _connections.Values.ToList();
}
}
public Task<IList<IMqttClientStatus>> GetClientStatusAsync()
{
var result = new List<IMqttClientStatus>();
@@ -142,10 +190,10 @@ namespace MQTTnet.Server
foreach (var connection in _connections.Values)
{
var clientStatus = new MqttClientStatus(connection);
connection.FillStatus(clientStatus);
connection.FillClientStatus(clientStatus);

var sessionStatus = new MqttSessionStatus(connection.Session, this);
connection.Session.FillStatus(sessionStatus);
connection.Session.FillSessionStatus(sessionStatus);
clientStatus.Session = sessionStatus;

result.Add(clientStatus);
@@ -164,7 +212,7 @@ namespace MQTTnet.Server
foreach (var session in _sessions.Values)
{
var sessionStatus = new MqttSessionStatus(session, this);
session.FillStatus(sessionStatus);
session.FillSessionStatus(sessionStatus);

result.Add(sessionStatus);
}
@@ -203,7 +251,7 @@ namespace MQTTnet.Server
{
_connections.TryGetValue(clientId, out connection);
}
lock (_sessions)
{
_sessions.Remove(clientId);
@@ -213,37 +261,8 @@ namespace MQTTnet.Server
{
await connection.StopAsync(MqttClientDisconnectReason.NormalDisconnection).ConfigureAwait(false);
}
_logger.Verbose("Session for client '{0}' deleted.", clientId);
}

public async Task CleanUpClient(string clientId, IMqttChannelAdapter channelAdapter, MqttClientDisconnectType disconnectType)
{
if (clientId != null)
{
// in case it is a takeover _connections already contains the new connection
if (disconnectType != MqttClientDisconnectType.Takeover)
{
lock (_connections)
{
_connections.Remove(clientId);
}

if (!_options.EnablePersistentSessions)
{
await DeleteSessionAsync(clientId).ConfigureAwait(false);
}
}
}

var endpoint = channelAdapter.Endpoint;

await SafeCleanupChannelAsync(channelAdapter).ConfigureAwait(false);

if (clientId != null)
{
await _eventDispatcher.SafeNotifyClientDisconnectedAsync(clientId, disconnectType, endpoint).ConfigureAwait(false);
}
_logger.Verbose("Session for client '{0}' deleted.", clientId);
}

public void Dispose()
@@ -278,7 +297,7 @@ namespace MQTTnet.Server
MqttPendingApplicationMessage queuedApplicationMessage;
try
{
queuedApplicationMessage = _messageQueue.Take(cancellationToken);
queuedApplicationMessage = _messageQueue.Take(cancellationToken);
}
catch (ArgumentNullException)
{
@@ -315,7 +334,7 @@ namespace MQTTnet.Server
applicationMessage = interceptorContext.ApplicationMessage;
}
}
await _eventDispatcher.SafeNotifyApplicationMessageReceivedAsync(senderClientId, applicationMessage).ConfigureAwait(false);

if (applicationMessage.Retain)
@@ -360,9 +379,12 @@ namespace MQTTnet.Server
}
}

async Task<MqttConnectionValidatorContext> ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter)
async Task<MqttConnectionValidatorContext> ValidateConnection(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter)
{
var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter, new ConcurrentDictionary<object, object>());
var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter)
{
SessionItems = new ConcurrentDictionary<object, object>()
};

var connectionValidator = _options.ConnectionValidator;

@@ -388,9 +410,8 @@ namespace MQTTnet.Server
return context;
}

async Task<MqttClientConnection> CreateClientConnectionAsync(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter)
async Task<MqttClientConnection> CreateClientConnection(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter, IDictionary<object, object> sessionItems)
{
MqttClientConnection existingConnection;
MqttClientConnection connection;

using (await _createConnectionSyncRoot.WaitAsync(CancellationToken.None).ConfigureAwait(false))
@@ -401,14 +422,14 @@ namespace MQTTnet.Server
if (!_sessions.TryGetValue(connectPacket.ClientId, out session))
{
_logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId);
session = CreateSession(connectPacket.ClientId, connectionValidatorContext);
session = CreateSession(connectPacket.ClientId, sessionItems);
}
else
{
if (connectPacket.CleanSession)
{
_logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId);
session = CreateSession(connectPacket.ClientId, connectionValidatorContext);
session = CreateSession(connectPacket.ClientId, sessionItems);
}
else
{
@@ -419,16 +440,21 @@ namespace MQTTnet.Server
_sessions[connectPacket.ClientId] = session;
}

MqttClientConnection existingConnection;

lock (_connections)
{
_connections.TryGetValue(connectPacket.ClientId, out existingConnection);
connection = CreateConnection(connectPacket, channelAdapter, session, connectionValidatorContext);
connection = CreateConnection(connectPacket, channelAdapter, session);

_connections[connectPacket.ClientId] = connection;
}

if (existingConnection != null)
{
await _eventDispatcher.SafeNotifyClientDisconnectedAsync(existingConnection.ClientId, MqttClientDisconnectType.Takeover, existingConnection.Endpoint);

existingConnection.IsTakenOver = true;
await existingConnection.StopAsync(MqttClientDisconnectReason.SessionTakenOver).ConfigureAwait(false);
}
}
@@ -436,7 +462,8 @@ namespace MQTTnet.Server
return connection;
}

async Task<MqttApplicationMessageInterceptorContext> InterceptApplicationMessageAsync(IMqttServerApplicationMessageInterceptor interceptor, MqttClientConnection clientConnection, MqttApplicationMessage applicationMessage)
async Task<MqttApplicationMessageInterceptorContext> InterceptApplicationMessageAsync(IMqttServerApplicationMessageInterceptor interceptor, MqttClientConnection clientConnection,
MqttApplicationMessage applicationMessage)
{
string senderClientId;
IDictionary<object, object> sessionItems;
@@ -453,29 +480,20 @@ namespace MQTTnet.Server
sessionItems = clientConnection.Session.Items;
}

var interceptorContext = new MqttApplicationMessageInterceptorContext(senderClientId, sessionItems, _logger)
var interceptorContext = new MqttApplicationMessageInterceptorContext
{
ClientId = senderClientId,
SessionItems = sessionItems,
Logger = _logger,
AcceptPublish = true,
ApplicationMessage = applicationMessage,
CloseConnection = false
CloseConnection = false
};

await interceptor.InterceptApplicationMessagePublishAsync(interceptorContext).ConfigureAwait(false);
return interceptorContext;
}

async Task SafeCleanupChannelAsync(IMqttChannelAdapter channelAdapter)
{
try
{
await channelAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.Error(exception, "Error while disconnecting client channel.");
}
}

MqttClientSession GetSession(string clientId)
{
lock (_sessions)
@@ -489,24 +507,23 @@ namespace MQTTnet.Server
}
}

MqttClientConnection CreateConnection(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter, MqttClientSession session, MqttConnectionValidatorContext connectionValidatorContext)
MqttClientConnection CreateConnection(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter, MqttClientSession session)
{
return new MqttClientConnection(
connectPacket,
channelAdapter,
session,
connectionValidatorContext,
_options,
this,
_retainedMessagesManager,
_rootLogger);
}

MqttClientSession CreateSession(string clientId, MqttConnectionValidatorContext connectionValidatorContext)
MqttClientSession CreateSession(string clientId, IDictionary<object, object> sessionItems)
{
return new MqttClientSession(
clientId,
connectionValidatorContext.SessionItems,
sessionItems,
_eventDispatcher,
_options,
_retainedMessagesManager,

Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs → Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs 查看文件

@@ -1,12 +1,12 @@
using MQTTnet.Packets;
using MQTTnet.Protocol;
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Packets;
using MQTTnet.Protocol;

namespace MQTTnet.Server
namespace MQTTnet.Server.Internal
{
public sealed class MqttClientSubscriptionsManager
{
@@ -37,9 +37,11 @@ namespace MQTTnet.Server
{
var interceptorContext = await InterceptSubscribeAsync(originalTopicFilter).ConfigureAwait(false);

var finalTopicFilter = interceptorContext.TopicFilter;
var finalTopicFilter = interceptorContext?.TopicFilter ?? originalTopicFilter;
var acceptSubscription = interceptorContext?.AcceptSubscription ?? true;
var closeConnection = interceptorContext?.CloseConnection ?? false;

if (finalTopicFilter == null || string.IsNullOrEmpty(finalTopicFilter.Topic) || !interceptorContext.AcceptSubscription)
if (finalTopicFilter == null || string.IsNullOrEmpty(finalTopicFilter.Topic) || !acceptSubscription)
{
result.ReturnCodes.Add(MqttSubscribeReturnCode.Failure);
result.ReasonCodes.Add(MqttSubscribeReasonCode.UnspecifiedError);
@@ -50,12 +52,12 @@ namespace MQTTnet.Server
result.ReasonCodes.Add(ConvertToSubscribeReasonCode(finalTopicFilter.QualityOfServiceLevel));
}

if (interceptorContext.CloseConnection)
if (closeConnection)
{
result.CloseConnection = true;
}

if (!interceptorContext.AcceptSubscription || string.IsNullOrEmpty(finalTopicFilter?.Topic))
if (!acceptSubscription || string.IsNullOrEmpty(finalTopicFilter?.Topic))
{
continue;
}
@@ -83,12 +85,7 @@ namespace MQTTnet.Server
foreach (var topicFilter in topicFilters)
{
var interceptorContext = await InterceptSubscribeAsync(topicFilter).ConfigureAwait(false);
if (!interceptorContext.AcceptSubscription)
{
continue;
}

if (!interceptorContext.AcceptSubscription)
if (interceptorContext != null && !interceptorContext.AcceptSubscription)
{
continue;
}
@@ -116,7 +113,7 @@ namespace MQTTnet.Server
foreach (var topicFilter in unsubscribePacket.TopicFilters)
{
var interceptorContext = await InterceptUnsubscribeAsync(topicFilter).ConfigureAwait(false);
if (!interceptorContext.AcceptUnsubscription)
if (interceptorContext != null && !interceptorContext.AcceptUnsubscription)
{
reasonCodes.Add(MqttUnsubscribeReasonCode.ImplementationSpecificError);
continue;
@@ -150,7 +147,7 @@ namespace MQTTnet.Server
foreach (var topicFilter in topicFilters)
{
var interceptorContext = await InterceptUnsubscribeAsync(topicFilter).ConfigureAwait(false);
if (!interceptorContext.AcceptUnsubscription)
if (interceptorContext != null && !interceptorContext.AcceptUnsubscription)
{
continue;
}
@@ -187,7 +184,7 @@ namespace MQTTnet.Server
{
continue;
}
qosLevels.Add(subscription.Value.QualityOfServiceLevel);
}

@@ -211,23 +208,41 @@ namespace MQTTnet.Server

async Task<MqttSubscriptionInterceptorContext> InterceptSubscribeAsync(MqttTopicFilter topicFilter)
{
var context = new MqttSubscriptionInterceptorContext(_clientSession.ClientId, topicFilter, _clientSession.Items);
if (_options.SubscriptionInterceptor != null)
var interceptor = _options.SubscriptionInterceptor;
if (interceptor == null)
{
await _options.SubscriptionInterceptor.InterceptSubscriptionAsync(context).ConfigureAwait(false);
return null;
}

var context = new MqttSubscriptionInterceptorContext
{
ClientId = _clientSession.ClientId,
TopicFilter = topicFilter,
SessionItems = _clientSession.Items
};

await interceptor.InterceptSubscriptionAsync(context).ConfigureAwait(false);

return context;
}

async Task<MqttUnsubscriptionInterceptorContext> InterceptUnsubscribeAsync(string topicFilter)
{
var context = new MqttUnsubscriptionInterceptorContext(_clientSession.ClientId, topicFilter, _clientSession.Items);
if (_options.UnsubscriptionInterceptor != null)
var interceptor = _options.UnsubscriptionInterceptor;
if (interceptor == null)
{
await _options.UnsubscriptionInterceptor.InterceptUnsubscriptionAsync(context).ConfigureAwait(false);
return null;
}

var context = new MqttUnsubscriptionInterceptorContext
{
ClientId = _clientSession.ClientId,
Topic = topicFilter,
SessionItems = _clientSession.Items
};

await interceptor.InterceptUnsubscriptionAsync(context).ConfigureAwait(false);

return context;
}

@@ -254,4 +269,4 @@ namespace MQTTnet.Server
};
}
}
}
}

Source/MQTTnet/Server/MqttRetainedMessagesManager.cs → Source/MQTTnet/Server/Internal/MqttRetainedMessagesManager.cs 查看文件

@@ -1,13 +1,13 @@
using MQTTnet.Diagnostics;
using MQTTnet.Implementations;
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;
using MQTTnet.Implementations;
using MQTTnet.Internal;

namespace MQTTnet.Server
namespace MQTTnet.Server.Internal
{
public sealed class MqttRetainedMessagesManager : IMqttRetainedMessagesManager
{

Source/MQTTnet/Server/MqttServerEventDispatcher.cs → Source/MQTTnet/Server/Internal/MqttServerEventDispatcher.cs 查看文件

@@ -1,9 +1,11 @@
using MQTTnet.Client.Receiving;
using MQTTnet.Diagnostics;
using System;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Client.Receiving;
using MQTTnet.Diagnostics;
using MQTTnet.Packets;

namespace MQTTnet.Server
namespace MQTTnet.Server.Internal
{
public sealed class MqttServerEventDispatcher
{
@@ -26,7 +28,7 @@ namespace MQTTnet.Server

public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get; set; }

public async Task SafeNotifyClientConnectedAsync(string clientId)
public async Task SafeNotifyClientConnectedAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter)
{
try
{
@@ -36,7 +38,13 @@ namespace MQTTnet.Server
return;
}

await handler.HandleClientConnectedAsync(new MqttServerClientConnectedEventArgs(clientId)).ConfigureAwait(false);
await handler.HandleClientConnectedAsync(new MqttServerClientConnectedEventArgs
{
ClientId = connectPacket.ClientId,
UserName = connectPacket.Username,
ProtocolVersion = channelAdapter.PacketFormatterAdapter.ProtocolVersion,
Endpoint = channelAdapter.Endpoint
}).ConfigureAwait(false);
}
catch (Exception exception)
{
@@ -54,7 +62,12 @@ namespace MQTTnet.Server
return;
}

await handler.HandleClientDisconnectedAsync(new MqttServerClientDisconnectedEventArgs(clientId, disconnectType, endpoint)).ConfigureAwait(false);
await handler.HandleClientDisconnectedAsync(new MqttServerClientDisconnectedEventArgs
{
ClientId = clientId,
DisconnectType = disconnectType,
Endpoint = endpoint
}).ConfigureAwait(false);
}
catch (Exception exception)
{
@@ -72,7 +85,11 @@ namespace MQTTnet.Server
return;
}

await handler.HandleClientSubscribedTopicAsync(new MqttServerClientSubscribedTopicEventArgs(clientId, topicFilter)).ConfigureAwait(false);
await handler.HandleClientSubscribedTopicAsync(new MqttServerClientSubscribedTopicEventArgs
{
ClientId = clientId,
TopicFilter = topicFilter
}).ConfigureAwait(false);
}
catch (Exception exception)
{
@@ -90,7 +107,11 @@ namespace MQTTnet.Server
return;
}

await handler.HandleClientUnsubscribedTopicAsync(new MqttServerClientUnsubscribedTopicEventArgs(clientId, topicFilter)).ConfigureAwait(false);
await handler.HandleClientUnsubscribedTopicAsync(new MqttServerClientUnsubscribedTopicEventArgs
{
ClientId = clientId,
TopicFilter = topicFilter
}).ConfigureAwait(false);
}
catch (Exception exception)
{

Source/MQTTnet/Server/MqttServerKeepAliveMonitor.cs → Source/MQTTnet/Server/Internal/MqttServerKeepAliveMonitor.cs 查看文件

@@ -1,12 +1,12 @@
using MQTTnet.Diagnostics;
using MQTTnet.Internal;
using System;
using System;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Diagnostics;
using MQTTnet.Implementations;
using MQTTnet.Internal;

namespace MQTTnet.Server
namespace MQTTnet.Server.Internal
{
public sealed class MqttServerKeepAliveMonitor
{
@@ -70,7 +70,7 @@ namespace MQTTnet.Server
{
try
{
if (connection.Status != MqttClientConnectionStatus.Running)
if (!connection.IsRunning)
{
// The connection is already dead or just created so there is no need to check it.
return;
@@ -94,14 +94,14 @@ namespace MQTTnet.Server
// If the client sends 1 sec. the server will allow up to 1.5 seconds.
var maxDurationWithoutPacket = connection.ConnectPacket.KeepAlivePeriod * 1.5D;

var secondsWithoutPackage = (now - connection.LastPacketReceivedTimestamp).TotalSeconds;
var secondsWithoutPackage = (now - connection.Statistics.LastPacketReceivedTimestamp).TotalSeconds;
if (secondsWithoutPackage < maxDurationWithoutPacket)
{
// A packet was received before the timeout is affected.
return;
}

_logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", connection.ClientId);
_logger.Warning("Client '{0}': Did not receive any packet or keep alive signal.", connection.ClientId);

// Execute the disconnection in background so that the keep alive monitor can continue
// with checking other connections.

Source/MQTTnet/Server/MqttTopicFilterComparer.cs → Source/MQTTnet/Server/Internal/MqttTopicFilterComparer.cs 查看文件

@@ -1,6 +1,6 @@
using System;

namespace MQTTnet.Server
namespace MQTTnet.Server.Internal
{
public static class MqttTopicFilterComparer
{

+ 4
- 12
Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs 查看文件

@@ -1,35 +1,27 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using MQTTnet.Diagnostics;

namespace MQTTnet.Server
{
public sealed class MqttApplicationMessageInterceptorContext
{
public MqttApplicationMessageInterceptorContext(string clientId, IDictionary<object, object> sessionItems, IMqttNetScopedLogger logger)
{
ClientId = clientId;
SessionItems = sessionItems;
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

/// <summary>
/// Gets the currently used logger.
/// </summary>
public IMqttNetScopedLogger Logger { get; }
public IMqttNetScopedLogger Logger { get; internal set; }

/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
public string ClientId { get; internal set; }

public MqttApplicationMessage ApplicationMessage { get; set; }

/// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public IDictionary<object, object> SessionItems { get; }
public IDictionary<object, object> SessionItems { get; internal set; }

public bool AcceptPublish { get; set; } = true;



+ 0
- 11
Source/MQTTnet/Server/MqttClientConnectionStatus.cs 查看文件

@@ -1,11 +0,0 @@
namespace MQTTnet.Server
{
public enum MqttClientConnectionStatus
{
Initializing,

Running,

Finalizing
}
}

+ 3
- 11
Source/MQTTnet/Server/MqttClientMessageQueueInterceptorContext.cs 查看文件

@@ -2,19 +2,11 @@

namespace MQTTnet.Server
{
public class MqttClientMessageQueueInterceptorContext
public sealed class MqttClientMessageQueueInterceptorContext
{
public MqttClientMessageQueueInterceptorContext(string senderClientId, string receiverClientId, MqttApplicationMessage applicationMessage, MqttQualityOfServiceLevel subscriptionQualityOfServiceLevel)
{
SenderClientId = senderClientId;
ReceiverClientId = receiverClientId;
ApplicationMessage = applicationMessage;
SubscriptionQualityOfServiceLevel = subscriptionQualityOfServiceLevel;
}
public string SenderClientId { get; internal set; }

public string SenderClientId { get; }

public string ReceiverClientId { get; }
public string ReceiverClientId { get; internal set; }

public MqttApplicationMessage ApplicationMessage { get; set; }



+ 4
- 3
Source/MQTTnet/Server/MqttClientMessageQueueInterceptorDelegate.cs 查看文件

@@ -1,11 +1,12 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Implementations;

namespace MQTTnet.Server
{
public class MqttClientMessageQueueInterceptorDelegate : IMqttServerClientMessageQueueInterceptor
public sealed class MqttClientMessageQueueInterceptorDelegate : IMqttServerClientMessageQueueInterceptor
{
private readonly Func<MqttClientMessageQueueInterceptorContext, Task> _callback;
readonly Func<MqttClientMessageQueueInterceptorContext, Task> _callback;

public MqttClientMessageQueueInterceptorDelegate(Action<MqttClientMessageQueueInterceptorContext> callback)
{
@@ -14,7 +15,7 @@ namespace MQTTnet.Server
_callback = context =>
{
callback(context);
return Task.FromResult(0);
return PlatformAbstractionLayer.CompletedTask;
};
}



+ 7
- 7
Source/MQTTnet/Server/MqttConnectionValidatorContext.cs 查看文件

@@ -4,21 +4,21 @@ using System.Security.Cryptography.X509Certificates;
using System.Text;
using MQTTnet.Adapter;
using MQTTnet.Formatter;
using MQTTnet.Implementations;
using MQTTnet.Packets;
using MQTTnet.Protocol;

namespace MQTTnet.Server
{
public class MqttConnectionValidatorContext
public sealed class MqttConnectionValidatorContext
{
private readonly MqttConnectPacket _connectPacket;
private readonly IMqttChannelAdapter _clientAdapter;
readonly MqttConnectPacket _connectPacket;
readonly IMqttChannelAdapter _clientAdapter;

public MqttConnectionValidatorContext(MqttConnectPacket connectPacket, IMqttChannelAdapter clientAdapter, IDictionary<object, object> sessionItems)
public MqttConnectionValidatorContext(MqttConnectPacket connectPacket, IMqttChannelAdapter clientAdapter)
{
_connectPacket = connectPacket;
_clientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter));
SessionItems = sessionItems;
}

/// <summary>
@@ -39,7 +39,7 @@ namespace MQTTnet.Server

public byte[] RawPassword => _connectPacket?.Password;

public string Password => Encoding.UTF8.GetString(RawPassword ?? new byte[0]);
public string Password => Encoding.UTF8.GetString(RawPassword ?? PlatformAbstractionLayer.EmptyByteArray);

/// <summary>
/// Gets or sets the will delay interval.
@@ -128,7 +128,7 @@ namespace MQTTnet.Server
/// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public IDictionary<object, object> SessionItems { get; }
public IDictionary<object, object> SessionItems { get; internal set; }

/// <summary>
/// This is used for MQTTv3 only.


+ 3
- 1
Source/MQTTnet/Server/MqttPendingApplicationMessage.cs 查看文件

@@ -1,4 +1,6 @@
namespace MQTTnet.Server
using MQTTnet.Server.Internal;

namespace MQTTnet.Server
{
public sealed class MqttPendingApplicationMessage
{


+ 1
- 1
Source/MQTTnet/Server/MqttQueuedApplicationMessage.cs 查看文件

@@ -3,7 +3,7 @@ using MQTTnet.Protocol;

namespace MQTTnet.Server
{
public class MqttQueuedApplicationMessage
public sealed class MqttQueuedApplicationMessage
{
public MqttApplicationMessage ApplicationMessage { get; set; }



+ 1
- 0
Source/MQTTnet/Server/MqttServer.cs 查看文件

@@ -12,6 +12,7 @@ using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Implementations;
using MQTTnet.Internal;
using MQTTnet.Server.Internal;

namespace MQTTnet.Server
{


+ 2
- 1
Source/MQTTnet/Server/MqttServerApplicationMessageInterceptorDelegate.cs 查看文件

@@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Implementations;

namespace MQTTnet.Server
{
@@ -14,7 +15,7 @@ namespace MQTTnet.Server
_callback = context =>
{
callback(context);
return Task.FromResult(0);
return PlatformAbstractionLayer.CompletedTask;
};
}



+ 19
- 8
Source/MQTTnet/Server/MqttServerClientConnectedEventArgs.cs 查看文件

@@ -1,18 +1,29 @@
using System;
using MQTTnet.Formatter;

namespace MQTTnet.Server
{
public class MqttServerClientConnectedEventArgs : EventArgs
public sealed class MqttServerClientConnectedEventArgs : EventArgs
{
public MqttServerClientConnectedEventArgs(string clientId)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
}

/// <summary>
/// Gets the client identifier.
/// Gets the client identifier of the connected client.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
public string ClientId { get; internal set; }

/// <summary>
/// Gets the user name of the connected client.
/// </summary>
public string UserName { get; internal set; }

/// <summary>
/// Gets the protocol version which is used by the connected client.
/// </summary>
public MqttProtocolVersion ProtocolVersion { get; internal set; }

/// <summary>
/// Gets the endpoint of the connected client.
/// </summary>
public string Endpoint { get; internal set; }
}
}

+ 4
- 3
Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs 查看文件

@@ -1,11 +1,12 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Implementations;

namespace MQTTnet.Server
{
public class MqttServerClientConnectedHandlerDelegate : IMqttServerClientConnectedHandler
public sealed class MqttServerClientConnectedHandlerDelegate : IMqttServerClientConnectedHandler
{
private readonly Func<MqttServerClientConnectedEventArgs, Task> _handler;
readonly Func<MqttServerClientConnectedEventArgs, Task> _handler;

public MqttServerClientConnectedHandlerDelegate(Action<MqttServerClientConnectedEventArgs> handler)
{
@@ -14,7 +15,7 @@ namespace MQTTnet.Server
_handler = eventArgs =>
{
handler(eventArgs);
return Task.FromResult(0);
return PlatformAbstractionLayer.CompletedTask;
};
}



+ 4
- 11
Source/MQTTnet/Server/MqttServerClientDisconnectedEventArgs.cs 查看文件

@@ -2,23 +2,16 @@ using System;

namespace MQTTnet.Server
{
public class MqttServerClientDisconnectedEventArgs : EventArgs
public sealed class MqttServerClientDisconnectedEventArgs : EventArgs
{
public MqttServerClientDisconnectedEventArgs(string clientId, MqttClientDisconnectType disconnectType, string endpoint)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
DisconnectType = disconnectType;
Endpoint = endpoint;
}

/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
public string ClientId { get; internal set; }

public MqttClientDisconnectType DisconnectType { get; }
public MqttClientDisconnectType DisconnectType { get; internal set; }

public string Endpoint { get; }
public string Endpoint { get; internal set; }
}
}

+ 4
- 3
Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs 查看文件

@@ -1,11 +1,12 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Implementations;

namespace MQTTnet.Server
{
public class MqttServerClientDisconnectedHandlerDelegate : IMqttServerClientDisconnectedHandler
public sealed class MqttServerClientDisconnectedHandlerDelegate : IMqttServerClientDisconnectedHandler
{
private readonly Func<MqttServerClientDisconnectedEventArgs, Task> _handler;
readonly Func<MqttServerClientDisconnectedEventArgs, Task> _handler;

public MqttServerClientDisconnectedHandlerDelegate(Action<MqttServerClientDisconnectedEventArgs> handler)
{
@@ -14,7 +15,7 @@ namespace MQTTnet.Server
_handler = eventArgs =>
{
handler(eventArgs);
return Task.FromResult(0);
return PlatformAbstractionLayer.CompletedTask;
};
}



+ 3
- 9
Source/MQTTnet/Server/MqttServerClientSubscribedTopicEventArgs.cs 查看文件

@@ -2,24 +2,18 @@

namespace MQTTnet.Server
{
public class MqttServerClientSubscribedTopicEventArgs : EventArgs
public sealed class MqttServerClientSubscribedTopicEventArgs : EventArgs
{
public MqttServerClientSubscribedTopicEventArgs(string clientId, MqttTopicFilter topicFilter)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter));
}

/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
public string ClientId { get; internal set; }

/// <summary>
/// Gets the topic filter.
/// The topic filter can contain topics and wildcards.
/// </summary>
public MqttTopicFilter TopicFilter { get; }
public MqttTopicFilter TopicFilter { get; internal set; }
}
}

+ 18
- 5
Source/MQTTnet/Server/MqttServerClientSubscribedTopicHandlerDelegate.cs 查看文件

@@ -1,24 +1,37 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Implementations;

namespace MQTTnet.Server
{
public class MqttServerClientSubscribedHandlerDelegate : IMqttServerClientSubscribedTopicHandler
[Obsolete("Use MqttServerClientSubscribedTopicHandlerDelegate instead. This will be removed in a future version.")]
public sealed class MqttServerClientSubscribedHandlerDelegate : MqttServerClientSubscribedTopicHandlerDelegate
{
private readonly Func<MqttServerClientSubscribedTopicEventArgs, Task> _handler;
public MqttServerClientSubscribedHandlerDelegate(Action<MqttServerClientSubscribedTopicEventArgs> handler) : base(handler)
{
}

public MqttServerClientSubscribedHandlerDelegate(Func<MqttServerClientSubscribedTopicEventArgs, Task> handler) : base(handler)
{
}
}
public class MqttServerClientSubscribedTopicHandlerDelegate : IMqttServerClientSubscribedTopicHandler
{
readonly Func<MqttServerClientSubscribedTopicEventArgs, Task> _handler;

public MqttServerClientSubscribedHandlerDelegate(Action<MqttServerClientSubscribedTopicEventArgs> handler)
public MqttServerClientSubscribedTopicHandlerDelegate(Action<MqttServerClientSubscribedTopicEventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

_handler = eventArgs =>
{
handler(eventArgs);
return Task.FromResult(0);
return PlatformAbstractionLayer.CompletedTask;
};
}

public MqttServerClientSubscribedHandlerDelegate(Func<MqttServerClientSubscribedTopicEventArgs, Task> handler)
public MqttServerClientSubscribedTopicHandlerDelegate(Func<MqttServerClientSubscribedTopicEventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}


+ 3
- 9
Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicEventArgs.cs 查看文件

@@ -2,24 +2,18 @@

namespace MQTTnet.Server
{
public class MqttServerClientUnsubscribedTopicEventArgs : EventArgs
public sealed class MqttServerClientUnsubscribedTopicEventArgs : EventArgs
{
public MqttServerClientUnsubscribedTopicEventArgs(string clientId, string topicFilter)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter));
}

/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
public string ClientId { get; internal set; }

/// <summary>
/// Gets or sets the topic filter.
/// The topic filter can contain topics and wildcards.
/// </summary>
public string TopicFilter { get; }
public string TopicFilter { get; internal set; }
}
}

+ 4
- 3
Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicHandlerDelegate.cs 查看文件

@@ -1,11 +1,12 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Implementations;

namespace MQTTnet.Server
{
public class MqttServerClientUnsubscribedTopicHandlerDelegate : IMqttServerClientUnsubscribedTopicHandler
public sealed class MqttServerClientUnsubscribedTopicHandlerDelegate : IMqttServerClientUnsubscribedTopicHandler
{
private readonly Func<MqttServerClientUnsubscribedTopicEventArgs, Task> _handler;
readonly Func<MqttServerClientUnsubscribedTopicEventArgs, Task> _handler;

public MqttServerClientUnsubscribedTopicHandlerDelegate(Action<MqttServerClientUnsubscribedTopicEventArgs> handler)
{
@@ -14,7 +15,7 @@ namespace MQTTnet.Server
_handler = eventArgs =>
{
handler(eventArgs);
return Task.FromResult(0);
return PlatformAbstractionLayer.CompletedTask;
};
}



+ 4
- 3
Source/MQTTnet/Server/MqttServerConnectionValidatorDelegate.cs 查看文件

@@ -1,11 +1,12 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Implementations;

namespace MQTTnet.Server
{
public class MqttServerConnectionValidatorDelegate : IMqttServerConnectionValidator
public sealed class MqttServerConnectionValidatorDelegate : IMqttServerConnectionValidator
{
private readonly Func<MqttConnectionValidatorContext, Task> _callback;
readonly Func<MqttConnectionValidatorContext, Task> _callback;

public MqttServerConnectionValidatorDelegate(Action<MqttConnectionValidatorContext> callback)
{
@@ -14,7 +15,7 @@ namespace MQTTnet.Server
_callback = context =>
{
callback(context);
return Task.FromResult(0);
return PlatformAbstractionLayer.CompletedTask;
};
}



+ 1
- 1
Source/MQTTnet/Server/MqttServerMultiThreadedApplicationMessageInterceptorDelegate.cs 查看文件

@@ -17,7 +17,7 @@ namespace MQTTnet.Server
_callback = context =>
{
callback(context);
return Task.FromResult(0);
return PlatformAbstractionLayer.CompletedTask;
};
}



+ 1
- 0
Source/MQTTnet/Server/MqttServerOptions.cs 查看文件

@@ -1,4 +1,5 @@
using System;
using MQTTnet.Server.Internal;

namespace MQTTnet.Server
{


+ 4
- 3
Source/MQTTnet/Server/MqttServerStartedHandlerDelegate.cs 查看文件

@@ -1,11 +1,12 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Implementations;

namespace MQTTnet.Server
{
public class MqttServerStartedHandlerDelegate : IMqttServerStartedHandler
public sealed class MqttServerStartedHandlerDelegate : IMqttServerStartedHandler
{
private readonly Func<EventArgs, Task> _handler;
readonly Func<EventArgs, Task> _handler;

public MqttServerStartedHandlerDelegate(Action<EventArgs> handler)
{
@@ -14,7 +15,7 @@ namespace MQTTnet.Server
_handler = eventArgs =>
{
handler(eventArgs);
return Task.FromResult(0);
return PlatformAbstractionLayer.CompletedTask;
};
}



+ 4
- 3
Source/MQTTnet/Server/MqttServerStoppedHandlerDelegate.cs 查看文件

@@ -1,11 +1,12 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Implementations;

namespace MQTTnet.Server
{
public class MqttServerStoppedHandlerDelegate : IMqttServerStoppedHandler
public sealed class MqttServerStoppedHandlerDelegate : IMqttServerStoppedHandler
{
private readonly Func<EventArgs, Task> _handler;
readonly Func<EventArgs, Task> _handler;

public MqttServerStoppedHandlerDelegate(Action<EventArgs> handler)
{
@@ -14,7 +15,7 @@ namespace MQTTnet.Server
_handler = eventArgs =>
{
handler(eventArgs);
return Task.FromResult(0);
return PlatformAbstractionLayer.CompletedTask;
};
}



+ 4
- 3
Source/MQTTnet/Server/MqttServerSubscriptionInterceptorDelegate.cs 查看文件

@@ -1,11 +1,12 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Implementations;

namespace MQTTnet.Server
{
public class MqttServerSubscriptionInterceptorDelegate : IMqttServerSubscriptionInterceptor
public sealed class MqttServerSubscriptionInterceptorDelegate : IMqttServerSubscriptionInterceptor
{
private readonly Func<MqttSubscriptionInterceptorContext, Task> _callback;
readonly Func<MqttSubscriptionInterceptorContext, Task> _callback;

public MqttServerSubscriptionInterceptorDelegate(Action<MqttSubscriptionInterceptorContext> callback)
{
@@ -14,7 +15,7 @@ namespace MQTTnet.Server
_callback = context =>
{
callback(context);
return Task.FromResult(0);
return PlatformAbstractionLayer.CompletedTask;
};
}



+ 3
- 10
Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs 查看文件

@@ -2,20 +2,13 @@

namespace MQTTnet.Server
{
public class MqttSubscriptionInterceptorContext
public sealed class MqttSubscriptionInterceptorContext
{
public MqttSubscriptionInterceptorContext(string clientId, MqttTopicFilter topicFilter, IDictionary<object, object> sessionItems)
{
ClientId = clientId;
TopicFilter = topicFilter;
SessionItems = sessionItems;
}

/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
public string ClientId { get; internal set; }

/// <summary>
/// Gets or sets the topic filter.
@@ -26,7 +19,7 @@ namespace MQTTnet.Server
/// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public IDictionary<object, object> SessionItems { get; }
public IDictionary<object, object> SessionItems { get; internal set; }

public bool AcceptSubscription { get; set; } = true;



+ 4
- 11
Source/MQTTnet/Server/MqttUnsubscriptionInterceptorContext.cs 查看文件

@@ -2,32 +2,25 @@

namespace MQTTnet.Server
{
public class MqttUnsubscriptionInterceptorContext
public sealed class MqttUnsubscriptionInterceptorContext
{
public MqttUnsubscriptionInterceptorContext(string clientId, string topic, IDictionary<object, object> sessionItems)
{
ClientId = clientId;
Topic = topic;
SessionItems = sessionItems;
}

/// <summary>
/// Gets the client identifier.
/// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues.
/// </summary>
public string ClientId { get; }
public string ClientId { get; internal set; }

/// <summary>
/// Gets or sets the MQTT topic.
/// In MQTT, the word topic refers to an UTF-8 string that the broker uses to filter messages for each connected client.
/// The topic consists of one or more topic levels. Each topic level is separated by a forward slash (topic level separator).
/// </summary>
public string Topic { get; set; }
public string Topic { get; internal set; }

/// <summary>
/// Gets or sets a key/value collection that can be used to share data within the scope of this session.
/// </summary>
public IDictionary<object, object> SessionItems { get; }
public IDictionary<object, object> SessionItems { get; internal set; }

public bool AcceptUnsubscription { get; set; } = true;



+ 0
- 9
Source/MQTTnet/Server/PrepareClientSessionResult.cs 查看文件

@@ -1,9 +0,0 @@
namespace MQTTnet.Server
{
public class PrepareClientSessionResult
{
public bool IsExistingSession { get; set; }

public MqttClientConnection Session { get; set; }
}
}

+ 4
- 0
Source/MQTTnet/Server/Status/IMqttClientStatus.cs 查看文件

@@ -16,8 +16,12 @@ namespace MQTTnet.Server.Status

MqttProtocolVersion ProtocolVersion { get; }

DateTime ConnectedTimestamp { get; set; }
DateTime LastPacketReceivedTimestamp { get; }

DateTime LastPacketSentTimestamp { get; set; }
DateTime LastNonKeepAlivePacketReceivedTimestamp { get; }

long ReceivedApplicationMessagesCount { get; }


+ 3
- 0
Source/MQTTnet/Server/Status/IMqttSessionStatus.cs 查看文件

@@ -11,6 +11,9 @@ namespace MQTTnet.Server.Status
/// </summary>
string ClientId { get; }

/// <summary>
/// Gets the count of messages which are not yet sent to the client but already queued.
/// </summary>
long PendingApplicationMessagesCount { get; }

IDictionary<object, object> Items { get; }


+ 6
- 3
Source/MQTTnet/Server/Status/MqttClientStatus.cs 查看文件

@@ -2,6 +2,7 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Server.Internal;

namespace MQTTnet.Server.Status
{
@@ -24,9 +25,11 @@ namespace MQTTnet.Server.Status

public MqttProtocolVersion ProtocolVersion { get; set; }

public DateTime LastPacketReceivedTimestamp { get; set; }

public DateTime ConnectedTimestamp { get; set; }
public DateTime LastPacketReceivedTimestamp { get; set; }
public DateTime LastPacketSentTimestamp { get; set; }

public DateTime LastNonKeepAlivePacketReceivedTimestamp { get; set; }

@@ -43,7 +46,7 @@ namespace MQTTnet.Server.Status
public long BytesSent { get; set; }

public long BytesReceived { get; set; }
public Task DisconnectAsync()
{
return _connection.StopAsync(MqttClientDisconnectReason.NormalDisconnection);


+ 5
- 1
Source/MQTTnet/Server/Status/MqttSessionStatus.cs 查看文件

@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Server.Internal;

namespace MQTTnet.Server.Status
{
@@ -25,8 +26,11 @@ namespace MQTTnet.Server.Status

public DateTime CreatedTimestamp { get; set; }

/// <summary>
/// This items can be used by the library user in order to store custom information.
/// </summary>
public IDictionary<object, object> Items { get; set; }

public Task DeleteAsync()
{
return _sessionsManager.DeleteSessionAsync(ClientId);


+ 1
- 0
Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs 查看文件

@@ -2,6 +2,7 @@
using BenchmarkDotNet.Jobs;
using MQTTnet.Server;
using System;
using MQTTnet.Server.Internal;

namespace MQTTnet.Benchmarks
{


+ 22
- 0
Tests/MQTTnet.Core.Tests/BaseTestClass.cs 查看文件

@@ -0,0 +1,22 @@
using System;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Tests.Mockups;

namespace MQTTnet.Tests
{
public abstract class BaseTestClass
{
public TestContext TestContext { get; set; }
protected TestEnvironment CreateTestEnvironment()
{
return new TestEnvironment(TestContext);
}

protected Task LongDelay()
{
return Task.Delay(TimeSpan.FromSeconds(1));
}
}
}

+ 2
- 3
Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs 查看文件

@@ -16,7 +16,7 @@ using MQTTnet.Tests.Mockups;
namespace MQTTnet.Tests.MQTTv5
{
[TestClass]
public class Client_Tests
public sealed class Client_Tests
{
public TestContext TestContext { get; set; }

@@ -60,6 +60,7 @@ namespace MQTTnet.Tests.MQTTv5
Assert.AreEqual(2, receivedMessage.UserProperties.Count);
}
}
[TestMethod]
public async Task Connect_With_AssignedClientId()
{
@@ -116,7 +117,6 @@ namespace MQTTnet.Tests.MQTTv5
Assert.AreEqual("test123", serverConnectedClientId);
Assert.AreEqual("test123", serverDisconnectedClientId);
Assert.AreEqual("test123", clientAssignedClientId);

}
}

@@ -165,7 +165,6 @@ namespace MQTTnet.Tests.MQTTv5

Assert.AreEqual(1, result.Items.Count);
Assert.AreEqual(MqttClientSubscribeResultCode.GrantedQoS1, result.Items[0].ResultCode);

}
}



+ 9
- 5
Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs 查看文件

@@ -172,21 +172,25 @@ namespace MQTTnet.Tests.Mockups
return StartServerAsync(new MqttServerOptionsBuilder());
}

public async Task<IMqttServer> StartServerAsync(MqttServerOptionsBuilder options)
public IMqttServer CreateServer()
{
if (options == null) throw new ArgumentNullException(nameof(options));

if (Server != null)
{
throw new InvalidOperationException("Server already started.");
}

Server = new TestServerWrapper(_mqttFactory.CreateMqttServer(ServerLogger), TestContext, this);
return Server;
}

public async Task<IMqttServer> StartServerAsync(MqttServerOptionsBuilder options)
{
CreateServer();
options.WithDefaultEndpointPort(ServerPort);
options.WithMaxPendingMessagesPerClient(int.MaxValue);

await Server.StartAsync(options.Build()).ConfigureAwait(false);
await Server.StartAsync(options.Build());

return Server;
}


+ 1
- 1
Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs 查看文件

@@ -50,7 +50,7 @@ namespace MQTTnet.Tests
Topic = "Test"
}, CancellationToken.None);
await Task.Delay(1000);
await Task.Delay(500);
// This will simulate a device which closes the connection directly
// after sending the data so do delay is added between send and dispose!


+ 1
- 0
Tests/MQTTnet.Core.Tests/MqttSubscriptionsManager_Tests.cs 查看文件

@@ -5,6 +5,7 @@ using MQTTnet.Server;
using MQTTnet.Tests.Mockups;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using MQTTnet.Server.Internal;

namespace MQTTnet.Tests
{


+ 186
- 0
Tests/MQTTnet.Core.Tests/Server_Events_Tests.cs 查看文件

@@ -0,0 +1,186 @@
using System;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Client.Receiving;
using MQTTnet.Formatter;
using MQTTnet.Protocol;
using MQTTnet.Server;

namespace MQTTnet.Tests
{
[TestClass]
public sealed class Server_Events_Tests : BaseTestClass
{
[TestMethod]
public async Task Fire_Client_Connected_Event()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = await testEnvironment.StartServerAsync();

MqttServerClientConnectedEventArgs eventArgs = null;
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e =>
{
eventArgs = e;
});

await testEnvironment.ConnectClientAsync(o => o.WithCredentials("TheUser"));
await LongDelay();
Assert.IsNotNull(eventArgs);
Assert.IsTrue(eventArgs.ClientId.StartsWith(nameof(Fire_Client_Connected_Event)));
Assert.IsTrue(eventArgs.Endpoint.Contains("::1"));
Assert.AreEqual(MqttProtocolVersion.V311, eventArgs.ProtocolVersion);
Assert.AreEqual("TheUser", eventArgs.UserName);
}
}

[TestMethod]
public async Task Fire_Client_Disconnected_Event()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = await testEnvironment.StartServerAsync();

MqttServerClientDisconnectedEventArgs eventArgs = null;
server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(e =>
{
eventArgs = e;
});

var client = await testEnvironment.ConnectClientAsync(o => o.WithCredentials("TheUser"));
await client.DisconnectAsync();
await LongDelay();
Assert.IsNotNull(eventArgs);
Assert.IsTrue(eventArgs.ClientId.StartsWith(nameof(Fire_Client_Disconnected_Event)));
Assert.IsTrue(eventArgs.Endpoint.Contains("::1"));
Assert.AreEqual(MqttClientDisconnectType.Clean, eventArgs.DisconnectType);
}
}
[TestMethod]
public async Task Fire_Client_Subscribed_Event()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = await testEnvironment.StartServerAsync();

MqttServerClientSubscribedTopicEventArgs eventArgs = null;
server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedTopicHandlerDelegate(e =>
{
eventArgs = e;
});

var client = await testEnvironment.ConnectClientAsync();
await client.SubscribeAsync("The/Topic", MqttQualityOfServiceLevel.AtLeastOnce);
await LongDelay();
Assert.IsNotNull(eventArgs);
Assert.IsTrue(eventArgs.ClientId.StartsWith(nameof(Fire_Client_Subscribed_Event)));
Assert.AreEqual("The/Topic", eventArgs.TopicFilter.Topic);
Assert.AreEqual(MqttQualityOfServiceLevel.AtLeastOnce, eventArgs.TopicFilter.QualityOfServiceLevel);
}
}
[TestMethod]
public async Task Fire_Client_Unsubscribed_Event()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = await testEnvironment.StartServerAsync();
MqttServerClientUnsubscribedTopicEventArgs eventArgs = null;
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e =>
{
eventArgs = e;
});
var client = await testEnvironment.ConnectClientAsync();
await client.UnsubscribeAsync("The/Topic");
await LongDelay();
Assert.IsNotNull(eventArgs);
Assert.IsTrue(eventArgs.ClientId.StartsWith(nameof(Fire_Client_Unsubscribed_Event)));
Assert.AreEqual("The/Topic", eventArgs.TopicFilter);
}
}
[TestMethod]
public async Task Fire_Application_Message_Received_Event()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = await testEnvironment.StartServerAsync();
MqttApplicationMessageReceivedEventArgs eventArgs = null;
server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e =>
{
eventArgs = e;
});
var client = await testEnvironment.ConnectClientAsync();
await client.PublishAsync("The_Topic", "The_Payload");
await LongDelay();
Assert.IsNotNull(eventArgs);
Assert.IsTrue(eventArgs.ClientId.StartsWith(nameof(Fire_Application_Message_Received_Event)));
Assert.AreEqual("The_Topic", eventArgs.ApplicationMessage.Topic);
Assert.AreEqual("The_Payload", eventArgs.ApplicationMessage.ConvertPayloadToString());
}
}
[TestMethod]
public async Task Fire_Started_Event()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = testEnvironment.CreateServer();
EventArgs eventArgs = null;
server.StartedHandler = new MqttServerStartedHandlerDelegate(e =>
{
eventArgs = e;
});

await server.StartAsync(new MqttServerOptionsBuilder().Build());
await LongDelay();
Assert.IsNotNull(eventArgs);
}
}
[TestMethod]
public async Task Fire_Stopped_Event()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = await testEnvironment.StartServerAsync();
EventArgs eventArgs = null;
server.StoppedHandler = new MqttServerStoppedHandlerDelegate(e =>
{
eventArgs = e;
});

await server.StopAsync();
await LongDelay();
Assert.IsNotNull(eventArgs);
}
}
}
}

+ 2
- 4
Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs 查看文件

@@ -10,10 +10,8 @@ using MQTTnet.Server;
namespace MQTTnet.Tests
{
[TestClass]
public class Server_Status_Tests
public sealed class Server_Status_Tests : BaseTestClass
{
public TestContext TestContext { get; set; }

[TestMethod]
public async Task Show_Client_And_Session_Statistics()
{
@@ -143,7 +141,7 @@ namespace MQTTnet.Tests
// At most once will send one packet to the client and the server will reply
// with an additional ACK packet.
await c1.PublishAsync("a", string.Empty, MqttQualityOfServiceLevel.AtLeastOnce);
await Task.Delay(50);
await Task.Delay(250);

var clientStatus = await server.GetClientStatusAsync();



+ 42
- 5
Tests/MQTTnet.Core.Tests/Server_Tests.cs 查看文件

@@ -147,7 +147,7 @@ namespace MQTTnet.Tests
}

[TestMethod]
public async Task Will_Message_Do_Not_Send()
public async Task Will_Message_Do_Not_Send_On_Clean_Disconnect()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
@@ -155,7 +155,7 @@ namespace MQTTnet.Tests

await testEnvironment.StartServerAsync();

var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").Build();

var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);

@@ -171,6 +171,38 @@ namespace MQTTnet.Tests
Assert.AreEqual(0, receivedMessagesCount);
}
}
[TestMethod]
public async Task Will_Message_Do_Not_Send_On_Takeover()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
var receivedMessagesCount = 0;

await testEnvironment.StartServerAsync();
// C1 will receive the last will!
var c1 = await testEnvironment.ConnectClientAsync();
c1.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(c => Interlocked.Increment(ref receivedMessagesCount));
await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build());

// C2 has the last will defined.
var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").Build();
var clientOptions = new MqttClientOptionsBuilder()
.WithWillMessage(willMessage)
.WithClientId("WillOwner");
var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
// C3 will do the connection takeover.
var c3 = await testEnvironment.ConnectClientAsync(clientOptions);
await Task.Delay(1000);

Assert.AreEqual(0, receivedMessagesCount);
}
}

[TestMethod]
public async Task Will_Message_Send()
@@ -571,7 +603,12 @@ namespace MQTTnet.Tests
using (var testEnvironment = new TestEnvironment(TestContext))
{
var server = await testEnvironment.StartServerAsync();
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1"));
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(async e =>
{
// Every client will automatically subscribe to this topic.
await server.SubscribeAsync(e.ClientId, "topic1");
});

var client = await testEnvironment.ConnectClientAsync();
var receivedMessages = new List<MqttApplicationMessage>();
@@ -636,11 +673,11 @@ namespace MQTTnet.Tests
Assert.AreEqual(1, clientConnectedCalled);
Assert.AreEqual(0, clientDisconnectedCalled);

await Task.Delay(500);
await Task.Delay(1000);

await c1.DisconnectAsync();

await Task.Delay(500);
await Task.Delay(1000);

Assert.AreEqual(1, clientConnectedCalled);
Assert.AreEqual(1, clientDisconnectedCalled);


+ 1
- 0
Tests/MQTTnet.Core.Tests/TopicFilterComparer_Tests.cs 查看文件

@@ -1,5 +1,6 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Server;
using MQTTnet.Server.Internal;

namespace MQTTnet.Tests
{


+ 5
- 0
Tests/MQTTnet.TestApp.NetCore/Program.cs 查看文件

@@ -31,6 +31,7 @@ namespace MQTTnet.TestApp.NetCore
Console.WriteLine("a = Start QoS 2 benchmark");
Console.WriteLine("b = Start QoS 1 benchmark");
Console.WriteLine("c = Start QoS 0 benchmark");
Console.WriteLine("d = Start server with logging");

var pressedKey = Console.ReadKey(true);
if (pressedKey.KeyChar == '1')
@@ -83,6 +84,10 @@ namespace MQTTnet.TestApp.NetCore
{
Task.Run(PerformanceTest.RunQoS0Test);
}
else if (pressedKey.KeyChar == 'd')
{
Task.Run(ServerTest.RunEmptyServerWithLogging);
}

Thread.Sleep(Timeout.Infinite);
}


+ 15
- 0
Tests/MQTTnet.TestApp.NetCore/ServerTest.cs 查看文件

@@ -2,8 +2,10 @@
using System.Text;
using System.Threading.Tasks;
using MQTTnet.Client.Receiving;
using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using MQTTnet.Server;
using MQTTnet.Server.Internal;

namespace MQTTnet.TestApp.NetCore
{
@@ -13,6 +15,19 @@ namespace MQTTnet.TestApp.NetCore
{
var mqttServer = new MqttFactory().CreateMqttServer();
mqttServer.StartAsync(new MqttServerOptions()).GetAwaiter().GetResult();
Console.WriteLine("Press any key to exit.");
Console.ReadLine();
}

public static void RunEmptyServerWithLogging()
{
var logger = new MqttNetLogger();
MqttNetConsoleLogger.ForwardToConsole(logger);
var mqttFactory = new MqttFactory(logger);
var mqttServer = mqttFactory.CreateMqttServer();
mqttServer.StartAsync(new MqttServerOptions()).GetAwaiter().GetResult();

Console.WriteLine("Press any key to exit.");
Console.ReadLine();


Loading…
取消
儲存