From cd8940f9b7cc7fc46c1465ae266aba6fdd2b546c Mon Sep 17 00:00:00 2001 From: Christian Date: Wed, 8 Sep 2021 18:01:56 +0200 Subject: [PATCH] Improve internal server connection management. (#1232) --- Build/MQTTnet.nuspec | 4 + Source/MQTTnet/Client/MqttClient.cs | 6 +- .../Options/MqttClientOptionsBuilder.cs | 14 +- ...plicationMessageReceivedHandlerDelegate.cs | 7 +- .../MQTTnet/Formatter/IMqttDataConverter.cs | 3 +- Source/MQTTnet/MqttTopicFilter.cs | 1 + Source/MQTTnet/Protocol/MqttRetainHandling.cs | 2 + .../CheckSubscriptionsResult.cs | 2 +- .../{ => Internal}/MqttClientConnection.cs | 476 +++++++++--------- .../MqttClientConnectionStatistics.cs | 90 ++++ .../{ => Internal}/MqttClientSession.cs | 10 +- ...ttClientSessionApplicationMessagesQueue.cs | 8 +- .../MqttClientSessionsManager.cs | 219 ++++---- .../MqttClientSubscriptionsManager.cs | 63 ++- .../MqttRetainedMessagesManager.cs | 8 +- .../MqttServerEventDispatcher.cs | 37 +- .../MqttServerKeepAliveMonitor.cs | 14 +- .../{ => Internal}/MqttTopicFilterComparer.cs | 2 +- ...qttApplicationMessageInterceptorContext.cs | 16 +- .../Server/MqttClientConnectionStatus.cs | 11 - ...qttClientMessageQueueInterceptorContext.cs | 14 +- ...ttClientMessageQueueInterceptorDelegate.cs | 7 +- .../Server/MqttConnectionValidatorContext.cs | 14 +- .../Server/MqttPendingApplicationMessage.cs | 4 +- .../Server/MqttQueuedApplicationMessage.cs | 2 +- Source/MQTTnet/Server/MqttServer.cs | 1 + ...erApplicationMessageInterceptorDelegate.cs | 3 +- .../MqttServerClientConnectedEventArgs.cs | 27 +- ...qttServerClientConnectedHandlerDelegate.cs | 7 +- .../MqttServerClientDisconnectedEventArgs.cs | 15 +- ...ServerClientDisconnectedHandlerDelegate.cs | 7 +- ...qttServerClientSubscribedTopicEventArgs.cs | 12 +- ...verClientSubscribedTopicHandlerDelegate.cs | 23 +- ...tServerClientUnsubscribedTopicEventArgs.cs | 12 +- ...rClientUnsubscribedTopicHandlerDelegate.cs | 7 +- .../MqttServerConnectionValidatorDelegate.cs | 7 +- ...edApplicationMessageInterceptorDelegate.cs | 2 +- Source/MQTTnet/Server/MqttServerOptions.cs | 1 + .../MqttServerStartedHandlerDelegate.cs | 7 +- .../MqttServerStoppedHandlerDelegate.cs | 7 +- ...ttServerSubscriptionInterceptorDelegate.cs | 7 +- .../MqttSubscriptionInterceptorContext.cs | 13 +- .../MqttUnsubscriptionInterceptorContext.cs | 15 +- .../Server/PrepareClientSessionResult.cs | 9 - .../Server/Status/IMqttClientStatus.cs | 4 + .../Server/Status/IMqttSessionStatus.cs | 3 + .../MQTTnet/Server/Status/MqttClientStatus.cs | 9 +- .../Server/Status/MqttSessionStatus.cs | 6 +- .../TopicFilterComparerBenchmark.cs | 1 + Tests/MQTTnet.Core.Tests/BaseTestClass.cs | 22 + .../MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs | 5 +- .../Mockups/TestEnvironment.cs | 14 +- Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs | 2 +- .../MqttSubscriptionsManager_Tests.cs | 1 + .../MQTTnet.Core.Tests/Server_Events_Tests.cs | 186 +++++++ .../MQTTnet.Core.Tests/Server_Status_Tests.cs | 6 +- Tests/MQTTnet.Core.Tests/Server_Tests.cs | 47 +- .../TopicFilterComparer_Tests.cs | 1 + Tests/MQTTnet.TestApp.NetCore/Program.cs | 5 + Tests/MQTTnet.TestApp.NetCore/ServerTest.cs | 15 + 60 files changed, 967 insertions(+), 576 deletions(-) rename Source/MQTTnet/Server/{ => Internal}/CheckSubscriptionsResult.cs (96%) rename Source/MQTTnet/Server/{ => Internal}/MqttClientConnection.cs (58%) create mode 100644 Source/MQTTnet/Server/Internal/MqttClientConnectionStatistics.cs rename Source/MQTTnet/Server/{ => Internal}/MqttClientSession.cs (96%) rename Source/MQTTnet/Server/{ => Internal}/MqttClientSessionApplicationMessagesQueue.cs (97%) rename Source/MQTTnet/Server/{ => Internal}/MqttClientSessionsManager.cs (76%) rename Source/MQTTnet/Server/{ => Internal}/MqttClientSubscriptionsManager.cs (84%) rename Source/MQTTnet/Server/{ => Internal}/MqttRetainedMessagesManager.cs (98%) rename Source/MQTTnet/Server/{ => Internal}/MqttServerEventDispatcher.cs (76%) rename Source/MQTTnet/Server/{ => Internal}/MqttServerKeepAliveMonitor.cs (92%) rename Source/MQTTnet/Server/{ => Internal}/MqttTopicFilterComparer.cs (99%) delete mode 100644 Source/MQTTnet/Server/MqttClientConnectionStatus.cs delete mode 100644 Source/MQTTnet/Server/PrepareClientSessionResult.cs create mode 100644 Tests/MQTTnet.Core.Tests/BaseTestClass.cs create mode 100644 Tests/MQTTnet.Core.Tests/Server_Events_Tests.cs diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 32bb327..5e0bce9 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -13,6 +13,10 @@ MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol. * [ManagedClient] Extended ReconnectAsync (thanks to @nvsnkv, #1202). +* [Server] Fixed a memory/performance leak when using QoS Level 1. +* [Server] Exposed connection timestamp in client status. +* [Server] Refactored connection management code. +* [Server] Exposed more details in _MqttServerClientConnectedEventArgs_. * [MQTTnet.Server] Moved server project to a dedicated GitHub repository. * [MQTTnet, MQTTnet.Extensions.ManagedClient] Fixed bug that allowed invalid subscriptions (Thanks to @marcelwinh). Git commit: $gitCommit diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 8163fc0..2e8bf91 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -363,9 +363,9 @@ namespace MQTTnet.Client await Task.WhenAll(receiverTask, publishPacketReceiverTask, keepAliveTask).ConfigureAwait(false); } - catch (Exception e) + catch (Exception innerException) { - _logger.Warning(e, "Error while waiting for internal tasks."); + _logger.Warning(innerException, "Error while waiting for internal tasks."); } finally { @@ -439,7 +439,7 @@ namespace MQTTnet.Client { if (exception is MqttCommunicationTimedOutException) { - _logger.Warning(null, "Timeout while waiting for response packet ({0}).", typeof(TResponsePacket).Name); + _logger.Warning("Timeout while waiting for response packet ({0}).", typeof(TResponsePacket).Name); } throw; diff --git a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs index 5bd4eac..a8560b3 100644 --- a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs +++ b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs @@ -133,7 +133,7 @@ namespace MQTTnet.Client.Options return this; } - public MqttClientOptionsBuilder WithCredentials(string username, string password = null) + public MqttClientOptionsBuilder WithCredentials(string username, string password) { byte[] passwordBuffer = null; @@ -145,7 +145,7 @@ namespace MQTTnet.Client.Options return WithCredentials(username, passwordBuffer); } - public MqttClientOptionsBuilder WithCredentials(string username, byte[] password = null) + public MqttClientOptionsBuilder WithCredentials(string username, byte[] password) { _options.Credentials = new MqttClientCredentials { @@ -155,6 +155,16 @@ namespace MQTTnet.Client.Options return this; } + + public MqttClientOptionsBuilder WithCredentials(string username) + { + _options.Credentials = new MqttClientCredentials + { + Username = username + }; + + return this; + } public MqttClientOptionsBuilder WithCredentials(IMqttClientCredentials credentials) { diff --git a/Source/MQTTnet/Client/Receiving/MqttApplicationMessageReceivedHandlerDelegate.cs b/Source/MQTTnet/Client/Receiving/MqttApplicationMessageReceivedHandlerDelegate.cs index 1d73aaa..dd25bb5 100644 --- a/Source/MQTTnet/Client/Receiving/MqttApplicationMessageReceivedHandlerDelegate.cs +++ b/Source/MQTTnet/Client/Receiving/MqttApplicationMessageReceivedHandlerDelegate.cs @@ -1,11 +1,12 @@ using System; using System.Threading.Tasks; +using MQTTnet.Implementations; namespace MQTTnet.Client.Receiving { - public class MqttApplicationMessageReceivedHandlerDelegate : IMqttApplicationMessageReceivedHandler + public sealed class MqttApplicationMessageReceivedHandlerDelegate : IMqttApplicationMessageReceivedHandler { - private readonly Func _handler; + readonly Func _handler; public MqttApplicationMessageReceivedHandlerDelegate(Action handler) { @@ -14,7 +15,7 @@ namespace MQTTnet.Client.Receiving _handler = context => { handler(context); - return Task.FromResult(0); + return PlatformAbstractionLayer.CompletedTask; }; } diff --git a/Source/MQTTnet/Formatter/IMqttDataConverter.cs b/Source/MQTTnet/Formatter/IMqttDataConverter.cs index aec4702..71fc30b 100644 --- a/Source/MQTTnet/Formatter/IMqttDataConverter.cs +++ b/Source/MQTTnet/Formatter/IMqttDataConverter.cs @@ -8,7 +8,6 @@ using MQTTnet.Client.Unsubscribing; using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Server; -using MqttClientSubscribeResult = MQTTnet.Client.Subscribing.MqttClientSubscribeResult; namespace MQTTnet.Formatter { @@ -22,7 +21,7 @@ namespace MQTTnet.Formatter MqttClientPublishResult CreateClientPublishResult(MqttPubRecPacket pubRecPacket, MqttPubCompPacket pubCompPacket); - MqttClientSubscribeResult CreateClientSubscribeResult(MqttSubscribePacket subscribePacket, MqttSubAckPacket subAckPacket); + Client.Subscribing.MqttClientSubscribeResult CreateClientSubscribeResult(MqttSubscribePacket subscribePacket, MqttSubAckPacket subAckPacket); MqttClientUnsubscribeResult CreateClientUnsubscribeResult(MqttUnsubscribePacket unsubscribePacket, MqttUnsubAckPacket unsubAckPacket); diff --git a/Source/MQTTnet/MqttTopicFilter.cs b/Source/MQTTnet/MqttTopicFilter.cs index fda60e0..e9cea55 100644 --- a/Source/MQTTnet/MqttTopicFilter.cs +++ b/Source/MQTTnet/MqttTopicFilter.cs @@ -8,6 +8,7 @@ namespace MQTTnet { } + // TODO: Consider using struct instead. public class MqttTopicFilter { /// diff --git a/Source/MQTTnet/Protocol/MqttRetainHandling.cs b/Source/MQTTnet/Protocol/MqttRetainHandling.cs index 8ee50cc..4f3fb46 100644 --- a/Source/MQTTnet/Protocol/MqttRetainHandling.cs +++ b/Source/MQTTnet/Protocol/MqttRetainHandling.cs @@ -3,7 +3,9 @@ public enum MqttRetainHandling { SendAtSubscribe = 0, + SendAtSubscribeIfNewSubscriptionOnly = 1, + DoNotSendOnSubscribe = 2 } } diff --git a/Source/MQTTnet/Server/CheckSubscriptionsResult.cs b/Source/MQTTnet/Server/Internal/CheckSubscriptionsResult.cs similarity index 96% rename from Source/MQTTnet/Server/CheckSubscriptionsResult.cs rename to Source/MQTTnet/Server/Internal/CheckSubscriptionsResult.cs index eb66f17..18a02fd 100644 --- a/Source/MQTTnet/Server/CheckSubscriptionsResult.cs +++ b/Source/MQTTnet/Server/Internal/CheckSubscriptionsResult.cs @@ -1,6 +1,6 @@ using MQTTnet.Protocol; -namespace MQTTnet.Server +namespace MQTTnet.Server.Internal { public struct CheckSubscriptionsResult { diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/Internal/MqttClientConnection.cs similarity index 58% rename from Source/MQTTnet/Server/MqttClientConnection.cs rename to Source/MQTTnet/Server/Internal/MqttClientConnection.cs index 8f33bb8..866624a 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientConnection.cs @@ -1,94 +1,82 @@ -using MQTTnet.Adapter; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Adapter; using MQTTnet.Client; +using MQTTnet.Client.Disconnecting; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; -using MQTTnet.Formatter; using MQTTnet.Implementations; using MQTTnet.Internal; using MQTTnet.PacketDispatcher; using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Server.Status; -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Client.Disconnecting; -namespace MQTTnet.Server +namespace MQTTnet.Server.Internal { public sealed class MqttClientConnection : IDisposable { readonly Dictionary _topicAlias = new Dictionary(); readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); - readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource(); - + readonly IMqttRetainedMessagesManager _retainedMessagesManager; readonly MqttClientSessionsManager _sessionsManager; + readonly IMqttChannelAdapter _channelAdapter; readonly IMqttNetScopedLogger _logger; readonly IMqttServerOptions _serverOptions; - readonly IMqttChannelAdapter _channelAdapter; - readonly MqttConnectionValidatorContext _connectionValidatorContext; - readonly IMqttDataConverter _dataConverter; readonly string _endpoint; - readonly DateTime _connectedTimestamp; - - volatile Task _packageReceiverTask; - DateTime _lastNonKeepAlivePacketReceivedTimestamp; - long _receivedPacketsCount; - long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere. - long _receivedApplicationMessagesCount; - long _sentApplicationMessagesCount; - MqttClientDisconnectReason _disconnectReason; + CancellationTokenSource _cancellationToken; - public MqttClientConnection(MqttConnectPacket connectPacket, + public MqttClientConnection( + MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter, MqttClientSession session, - MqttConnectionValidatorContext connectionValidatorContext, IMqttServerOptions serverOptions, MqttClientSessionsManager sessionsManager, IMqttRetainedMessagesManager retainedMessagesManager, IMqttNetLogger logger) { - Session = session ?? throw new ArgumentNullException(nameof(session)); _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions)); _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); _channelAdapter = channelAdapter ?? throw new ArgumentNullException(nameof(channelAdapter)); - _connectionValidatorContext = connectionValidatorContext ?? throw new ArgumentNullException(nameof(connectionValidatorContext)); - _dataConverter = _channelAdapter.PacketFormatterAdapter.DataConverter; - _endpoint = _channelAdapter.Endpoint; + _endpoint = channelAdapter.Endpoint; + + Session = session ?? throw new ArgumentNullException(nameof(session)); ConnectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket)); if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateScopedLogger(nameof(MqttClientConnection)); - - _connectedTimestamp = DateTime.UtcNow; - LastPacketReceivedTimestamp = _connectedTimestamp; - _lastNonKeepAlivePacketReceivedTimestamp = LastPacketReceivedTimestamp; } - public MqttClientConnectionStatus Status { get; private set; } = MqttClientConnectionStatus.Initializing; + public string ClientId => ConnectPacket.ClientId; - public MqttConnectPacket ConnectPacket { get; } + public string Endpoint => _endpoint; - public string ClientId => ConnectPacket.ClientId; + public MqttClientConnectionStatistics Statistics { get; } = new MqttClientConnectionStatistics(); - public bool IsReadingPacket => _channelAdapter.IsReadingPacket; + public bool IsRunning { get; private set; } + + public MqttConnectPacket ConnectPacket { get; } - public DateTime LastPacketReceivedTimestamp { get; private set; } + public bool IsReadingPacket => _channelAdapter.IsReadingPacket; public MqttClientSession Session { get; } + public bool IsTakenOver { get; set; } + + public bool IsCleanDisconnect { get; private set; } + public async Task StopAsync(MqttClientDisconnectReason reason) { - Status = MqttClientConnectionStatus.Finalizing; - _disconnectReason = reason; + IsRunning = false; if (reason == MqttClientDisconnectReason.SessionTakenOver || reason == MqttClientDisconnectReason.KeepAliveTimeout) { @@ -96,30 +84,10 @@ namespace MQTTnet.Server // token because the entire connection is closed (disposed) as soon as the cancellation // token is cancelled. To there is no chance that the DISCONNECT packet will ever arrive // at the client! - try - { - var disconnectOptions = new MqttClientDisconnectOptions - { - ReasonCode = reason, - ReasonString = reason.ToString() - }; - - var disconnectPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreateDisconnectPacket(disconnectOptions); - - using (var timeout = new CancellationTokenSource(_serverOptions.DefaultCommunicationTimeout)) - { - await _channelAdapter.SendPacketAsync(disconnectPacket, timeout.Token).ConfigureAwait(false); - } - } - catch (Exception exception) - { - _logger.Warning(exception, "Client '{0}': Error while sending DISCONNECT packet after takeover.", ClientId); - } + await TrySendDisconnectPacket(reason).ConfigureAwait(false); } StopInternal(); - - await (_packageReceiverTask ?? PlatformAbstractionLayer.CompletedTask); } public void ResetStatistics() @@ -127,24 +95,16 @@ namespace MQTTnet.Server _channelAdapter.ResetStatistics(); } - public void FillStatus(MqttClientStatus status) + public void FillClientStatus(MqttClientStatus clientStatus) { - status.ClientId = ClientId; - status.Endpoint = _endpoint; - status.ProtocolVersion = _channelAdapter.PacketFormatterAdapter.ProtocolVersion; + clientStatus.ClientId = ClientId; + clientStatus.Endpoint = _endpoint; - status.ReceivedApplicationMessagesCount = Interlocked.Read(ref _receivedApplicationMessagesCount); - status.SentApplicationMessagesCount = Interlocked.Read(ref _sentApplicationMessagesCount); + clientStatus.ProtocolVersion = _channelAdapter.PacketFormatterAdapter.ProtocolVersion; + clientStatus.BytesSent = _channelAdapter.BytesSent; + clientStatus.BytesReceived = _channelAdapter.BytesReceived; - status.ReceivedPacketsCount = Interlocked.Read(ref _receivedPacketsCount); - status.SentPacketsCount = Interlocked.Read(ref _sentPacketsCount); - - status.ConnectedTimestamp = _connectedTimestamp; - status.LastPacketReceivedTimestamp = LastPacketReceivedTimestamp; - status.LastNonKeepAlivePacketReceivedTimestamp = _lastNonKeepAlivePacketReceivedTimestamp; - - status.BytesSent = _channelAdapter.BytesSent; - status.BytesReceived = _channelAdapter.BytesReceived; + Statistics.FillClientStatus(clientStatus); } public void Dispose() @@ -152,31 +112,60 @@ namespace MQTTnet.Server _cancellationToken.Dispose(); } - public Task RunAsync() + public async Task RunAsync() { - _packageReceiverTask = RunInternalAsync(_cancellationToken.Token); - return _packageReceiverTask; - } + _logger.Info("Client '{0}': Session started.", ClientId); - async Task RunInternalAsync(CancellationToken cancellationToken) - { - var disconnectType = MqttClientDisconnectType.NotClean; - try + Session.WillMessage = ConnectPacket.WillMessage; + + using (var cancellationToken = new CancellationTokenSource()) { - _logger.Info("Client '{0}': Session started.", ClientId); + _cancellationToken = cancellationToken; + + try + { + Task.Run(() => SendPacketsLoop(cancellationToken.Token), cancellationToken.Token).RunInBackground(_logger); - Session.WillMessage = ConnectPacket.WillMessage; + Session.IsCleanSession = false; + + IsRunning = true; + + await ReceivePackagesLoop(cancellationToken.Token).ConfigureAwait(false); + } + finally + { + IsRunning = false; + + cancellationToken.Cancel(); + _cancellationToken = null; + } + } + + _packetDispatcher.CancelAll(); - await SendAsync(_channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(_connectionValidatorContext), cancellationToken).ConfigureAwait(false); + if (!IsTakenOver && !IsCleanDisconnect && Session.WillMessage != null) + { + _sessionsManager.DispatchApplicationMessage(Session.WillMessage, this); + Session.WillMessage = null; + } - Task.Run(() => SendPendingPacketsAsync(cancellationToken), cancellationToken).RunInBackground(_logger); + _logger.Info("Client '{0}': Connection stopped.", ClientId); + } - Session.IsCleanSession = false; + Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) + { + return _channelAdapter.SendPacketAsync(packet, cancellationToken).ContinueWith(task => { Statistics.HandleSentPacket(packet); }, cancellationToken); + } + async Task ReceivePackagesLoop(CancellationToken cancellationToken) + { + try + { + // We do not listen for the cancellation token here because the internal buffer might still + // contain data to be read even if the TCP connection was already dropped. So we rely on an + // own exception in the reading loop! while (!cancellationToken.IsCancellationRequested) { - Status = MqttClientConnectionStatus.Running; - var packet = await _channelAdapter.ReceivePacketAsync(cancellationToken).ConfigureAwait(false); if (packet == null) { @@ -184,13 +173,7 @@ namespace MQTTnet.Server return; } - Interlocked.Increment(ref _sentPacketsCount); - LastPacketReceivedTimestamp = DateTime.UtcNow; - - if (!(packet is MqttPingReqPacket || packet is MqttPingRespPacket)) - { - _lastNonKeepAlivePacketReceivedTimestamp = LastPacketReceivedTimestamp; - } + Statistics.HandleReceivedPacket(packet); if (packet is MqttPublishPacket publishPacket) { @@ -210,7 +193,8 @@ namespace MQTTnet.Server } else if (packet is MqttPingReqPacket) { - await SendAsync(MqttPingRespPacket.Instance, cancellationToken).ConfigureAwait(false); + // See: The Server MUST send a PINGRESP packet in response to a PINGREQ packet [MQTT-3.12.4-1]. + await SendPacketAsync(MqttPingRespPacket.Instance, cancellationToken).ConfigureAwait(false); } else if (packet is MqttPingRespPacket) { @@ -218,10 +202,7 @@ namespace MQTTnet.Server } else if (packet is MqttDisconnectPacket) { - Session.WillMessage = null; - disconnectType = MqttClientDisconnectType.Clean; - - StopInternal(); + IsCleanDisconnect = true; return; } else @@ -246,40 +227,111 @@ namespace MQTTnet.Server { _logger.Error(exception, "Client '{0}': Error while receiving client packets.", ClientId); } - - StopInternal(); } - finally + } + + async Task SendPacketsLoop(CancellationToken cancellationToken) + { + MqttQueuedApplicationMessage queuedApplicationMessage = null; + MqttPublishPacket publishPacket = null; + + try { - if (_disconnectReason == MqttClientDisconnectReason.SessionTakenOver) + while (!cancellationToken.IsCancellationRequested) { - disconnectType = MqttClientDisconnectType.Takeover; - } + queuedApplicationMessage = await Session.ApplicationMessagesQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); + if (queuedApplicationMessage == null) + { + return; + } - if (Session.WillMessage != null) - { - _sessionsManager.DispatchApplicationMessage(Session.WillMessage, this); - Session.WillMessage = null; - } + if (cancellationToken.IsCancellationRequested) + { + return; + } - _packetDispatcher.CancelAll(); + publishPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreatePublishPacket(queuedApplicationMessage.ApplicationMessage); + publishPacket.QualityOfServiceLevel = queuedApplicationMessage.SubscriptionQualityOfServiceLevel; - _logger.Info("Client '{0}': Connection stopped.", ClientId); + // Set the retain flag to true according to [MQTT-3.3.1-8] and [MQTT-3.3.1-9]. + publishPacket.Retain = queuedApplicationMessage.IsRetainedMessage; - try + publishPacket = await InvokeClientMessageQueueInterceptor(publishPacket, queuedApplicationMessage).ConfigureAwait(false); + if (publishPacket == null) + { + // The interceptor has decided that the message is not relevant and will be fully ignored. + continue; + } + + if (publishPacket.QualityOfServiceLevel > 0) + { + publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier(); + } + + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) + { + await SendPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false); + } + else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) + { + using (var awaiter = _packetDispatcher.AddAwaiter(publishPacket.PacketIdentifier)) + { + await SendPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false); + await awaiter.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false); + } + } + else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) + { + using (var awaiter1 = _packetDispatcher.AddAwaiter(publishPacket.PacketIdentifier)) + using (var awaiter2 = _packetDispatcher.AddAwaiter(publishPacket.PacketIdentifier)) + { + await SendPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false); + var pubRecPacket = await awaiter1.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false); + + var pubRelPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreatePubRelPacket(pubRecPacket, MqttApplicationMessageReceivedReasonCode.Success); + await SendPacketAsync(pubRelPacket, cancellationToken).ConfigureAwait(false); + + await awaiter2.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false); + } + } + + _logger.Verbose("Client '{0}': Queued application message sent.", ClientId); + } + } + catch (OperationCanceledException) + { + } + catch (Exception exception) + { + if (exception is MqttCommunicationTimedOutException) + { + _logger.Warning(exception, "Client '{0}': Sending publish packet failed: Timeout.", ClientId); + } + else if (exception is MqttCommunicationException) { - await _sessionsManager.CleanUpClient(ClientId, _channelAdapter, disconnectType); + _logger.Warning(exception, "Client '{0}': Sending publish packet failed: Communication exception.", ClientId); } - catch (Exception e) + else { - _logger.Error(e, "Client '{0}': Error while cleaning up", ClientId); + _logger.Error(exception, "Client '{0}': Sending publish packet failed.", ClientId); } + + if (publishPacket?.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) + { + if (queuedApplicationMessage != null) + { + queuedApplicationMessage.IsDuplicate = true; + Session.ApplicationMessagesQueue.Enqueue(queuedApplicationMessage); + } + } + + StopInternal(); } } void StopInternal() { - _cancellationToken.Cancel(); + _cancellationToken?.Cancel(); } async Task EnqueueSubscribedRetainedMessagesAsync(ICollection topicFilters) @@ -294,7 +346,7 @@ namespace MQTTnet.Server Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken) { var pubCompPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreatePubCompPacket(pubRelPacket, MqttApplicationMessageReceivedReasonCode.Success); - return SendAsync(pubCompPacket, cancellationToken); + return SendPacketAsync(pubCompPacket, cancellationToken); } async Task HandleIncomingSubscribePacketAsync(MqttSubscribePacket subscribePacket, CancellationToken cancellationToken) @@ -302,7 +354,7 @@ namespace MQTTnet.Server var subscribeResult = await Session.SubscriptionsManager.SubscribeAsync(subscribePacket, ConnectPacket).ConfigureAwait(false); var subAckPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreateSubAckPacket(subscribePacket, subscribeResult); - await SendAsync(subAckPacket, cancellationToken).ConfigureAwait(false); + await SendPacketAsync(subAckPacket, cancellationToken).ConfigureAwait(false); if (subscribeResult.CloseConnection) { @@ -318,38 +370,36 @@ namespace MQTTnet.Server var reasonCodes = await Session.SubscriptionsManager.UnsubscribeAsync(unsubscribePacket).ConfigureAwait(false); var unsubAckPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreateUnsubAckPacket(unsubscribePacket, reasonCodes); - await SendAsync(unsubAckPacket, cancellationToken).ConfigureAwait(false); + await SendPacketAsync(unsubAckPacket, cancellationToken).ConfigureAwait(false); } Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken) { - Interlocked.Increment(ref _sentApplicationMessagesCount); - HandleTopicAlias(publishPacket); - var applicationMessage = _dataConverter.CreateApplicationMessage(publishPacket); + var applicationMessage = _channelAdapter.PacketFormatterAdapter.DataConverter.CreateApplicationMessage(publishPacket); _sessionsManager.DispatchApplicationMessage(applicationMessage, this); switch (publishPacket.QualityOfServiceLevel) { case MqttQualityOfServiceLevel.AtMostOnce: - { - return PlatformAbstractionLayer.CompletedTask; - } + { + return PlatformAbstractionLayer.CompletedTask; + } case MqttQualityOfServiceLevel.AtLeastOnce: - { - var pubAckPacket = _dataConverter.CreatePubAckPacket(publishPacket, MqttApplicationMessageReceivedReasonCode.Success); - return SendAsync(pubAckPacket, cancellationToken); - } + { + var pubAckPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreatePubAckPacket(publishPacket, MqttApplicationMessageReceivedReasonCode.Success); + return SendPacketAsync(pubAckPacket, cancellationToken); + } case MqttQualityOfServiceLevel.ExactlyOnce: - { - var pubRecPacket = _dataConverter.CreatePubRecPacket(publishPacket, MqttApplicationMessageReceivedReasonCode.Success); - return SendAsync(pubRecPacket, cancellationToken); - } + { + var pubRecPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreatePubRecPacket(publishPacket, MqttApplicationMessageReceivedReasonCode.Success); + return SendPacketAsync(pubRecPacket, cancellationToken); + } default: - { - throw new MqttCommunicationException("Received a not supported QoS level."); - } + { + throw new MqttCommunicationException("Received a not supported QoS level."); + } } } @@ -376,134 +426,64 @@ namespace MQTTnet.Server } else { - } } } } - - async Task SendPendingPacketsAsync(CancellationToken cancellationToken) + + async Task TrySendDisconnectPacket(MqttClientDisconnectReason reason) { - MqttQueuedApplicationMessage queuedApplicationMessage = null; - MqttPublishPacket publishPacket = null; - try { - while (!cancellationToken.IsCancellationRequested) + var disconnectOptions = new MqttClientDisconnectOptions { - queuedApplicationMessage = await Session.ApplicationMessagesQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); - if (queuedApplicationMessage == null) - { - return; - } - - if (cancellationToken.IsCancellationRequested) - { - return; - } - - publishPacket = _dataConverter.CreatePublishPacket(queuedApplicationMessage.ApplicationMessage); - publishPacket.QualityOfServiceLevel = queuedApplicationMessage.SubscriptionQualityOfServiceLevel; - - // Set the retain flag to true according to [MQTT-3.3.1-8] and [MQTT-3.3.1-9]. - publishPacket.Retain = queuedApplicationMessage.IsRetainedMessage; - - if (_serverOptions.ClientMessageQueueInterceptor != null) - { - var context = new MqttClientMessageQueueInterceptorContext( - queuedApplicationMessage.SenderClientId, - ClientId, - queuedApplicationMessage.ApplicationMessage, - queuedApplicationMessage.SubscriptionQualityOfServiceLevel); - - if (_serverOptions.ClientMessageQueueInterceptor != null) - { - await _serverOptions.ClientMessageQueueInterceptor.InterceptClientMessageQueueEnqueueAsync(context).ConfigureAwait(false); - } - - if (!context.AcceptEnqueue || context.ApplicationMessage == null) - { - return; - } + ReasonCode = reason, + ReasonString = reason.ToString() + }; - publishPacket.Topic = context.ApplicationMessage.Topic; - publishPacket.Payload = context.ApplicationMessage.Payload; - publishPacket.QualityOfServiceLevel = context.SubscriptionQualityOfServiceLevel; - } - - if (publishPacket.QualityOfServiceLevel > 0) - { - publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNextPacketIdentifier(); - } + var disconnectPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreateDisconnectPacket(disconnectOptions); - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) - { - await SendAsync(publishPacket, cancellationToken).ConfigureAwait(false); - } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) - { - var awaiter = _packetDispatcher.AddAwaiter(publishPacket.PacketIdentifier); - await SendAsync(publishPacket, cancellationToken).ConfigureAwait(false); - await awaiter.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false); - } - else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) - { - using (var awaiter1 = _packetDispatcher.AddAwaiter(publishPacket.PacketIdentifier)) - using (var awaiter2 = _packetDispatcher.AddAwaiter(publishPacket.PacketIdentifier)) - { - await SendAsync(publishPacket, cancellationToken).ConfigureAwait(false); - var pubRecPacket = await awaiter1.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false); - - var pubRelPacket = _channelAdapter.PacketFormatterAdapter.DataConverter.CreatePubRelPacket(pubRecPacket, MqttApplicationMessageReceivedReasonCode.Success); - await SendAsync(pubRelPacket, cancellationToken).ConfigureAwait(false); - - await awaiter2.WaitOneAsync(_serverOptions.DefaultCommunicationTimeout).ConfigureAwait(false); - } - } - - _logger.Verbose("Client '{0}': Queued application message sent.", ClientId); + using (var timeout = new CancellationTokenSource(_serverOptions.DefaultCommunicationTimeout)) + { + await SendPacketAsync(disconnectPacket, timeout.Token).ConfigureAwait(false); } } catch (Exception exception) { - if (exception is OperationCanceledException) - { - } - else if (exception is MqttCommunicationTimedOutException) - { - _logger.Warning(exception, "Client '{0}': Sending publish packet failed: Timeout.", ClientId); - } - else if (exception is MqttCommunicationException) - { - _logger.Warning(exception, "Client '{0}': Sending publish packet failed: Communication exception.", ClientId); - } - else - { - _logger.Error(exception, "Client '{0}': Sending publish packet failed.", ClientId); - } + _logger.Warning(exception, "Client '{{0}}': Error while sending DISCONNECT packet (Reason = {1}).", ClientId, reason); + } + } - if (publishPacket?.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) - { - queuedApplicationMessage.IsDuplicate = true; + async Task InvokeClientMessageQueueInterceptor(MqttPublishPacket publishPacket, MqttQueuedApplicationMessage queuedApplicationMessage) + { + if (_serverOptions.ClientMessageQueueInterceptor == null) + { + return publishPacket; + } - Session.ApplicationMessagesQueue.Enqueue(queuedApplicationMessage); - } + var context = new MqttClientMessageQueueInterceptorContext + { + SenderClientId = queuedApplicationMessage.SenderClientId, + ReceiverClientId = ClientId, + ApplicationMessage = queuedApplicationMessage.ApplicationMessage, + SubscriptionQualityOfServiceLevel = queuedApplicationMessage.SubscriptionQualityOfServiceLevel + }; - StopInternal(); + if (_serverOptions.ClientMessageQueueInterceptor != null) + { + await _serverOptions.ClientMessageQueueInterceptor.InterceptClientMessageQueueEnqueueAsync(context).ConfigureAwait(false); } - } - Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken) - { - return _channelAdapter.SendPacketAsync(packet, cancellationToken).ContinueWith(task => + if (!context.AcceptEnqueue || context.ApplicationMessage == null) { - Interlocked.Increment(ref _receivedPacketsCount); + return null; + } - if (packet is MqttPublishPacket) - { - Interlocked.Increment(ref _receivedApplicationMessagesCount); - } - }, cancellationToken); + publishPacket.Topic = context.ApplicationMessage.Topic; + publishPacket.Payload = context.ApplicationMessage.Payload; + publishPacket.QualityOfServiceLevel = context.SubscriptionQualityOfServiceLevel; + + return publishPacket; } } -} +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/Internal/MqttClientConnectionStatistics.cs b/Source/MQTTnet/Server/Internal/MqttClientConnectionStatistics.cs new file mode 100644 index 0000000..14923f1 --- /dev/null +++ b/Source/MQTTnet/Server/Internal/MqttClientConnectionStatistics.cs @@ -0,0 +1,90 @@ +using System; +using System.Threading; +using MQTTnet.Packets; +using MQTTnet.Server.Status; + +namespace MQTTnet.Server.Internal +{ + public sealed class MqttClientConnectionStatistics + { + readonly DateTime _connectedTimestamp; + + DateTime _lastNonKeepAlivePacketReceivedTimestamp; + DateTime _lastPacketReceivedTimestamp; + DateTime _lastPacketSentTimestamp; + + // Start with 1 because the CONNACK packet is not counted here. + long _receivedPacketsCount = 1; + + // Start with 1 because the CONNECT packet is not counted here. + long _sentPacketsCount = 1; + + long _receivedApplicationMessagesCount; + long _sentApplicationMessagesCount; + + public MqttClientConnectionStatistics() + { + _connectedTimestamp = DateTime.UtcNow; + + _lastPacketReceivedTimestamp = _connectedTimestamp; + _lastPacketSentTimestamp = _connectedTimestamp; + + _lastNonKeepAlivePacketReceivedTimestamp = _connectedTimestamp; + } + + public DateTime LastPacketReceivedTimestamp => _lastPacketReceivedTimestamp; + + public void HandleReceivedPacket(MqttBasePacket packet) + { + if (packet == null) throw new ArgumentNullException(nameof(packet)); + + // This class is tracking all values from Clients perspective! + _lastPacketSentTimestamp = DateTime.UtcNow; + + Interlocked.Increment(ref _sentPacketsCount); + + if (packet is MqttPublishPacket) + { + Interlocked.Increment(ref _sentApplicationMessagesCount); + } + + if (!(packet is MqttPingReqPacket || packet is MqttPingRespPacket)) + { + _lastNonKeepAlivePacketReceivedTimestamp = _lastPacketReceivedTimestamp; + } + } + + public void HandleSentPacket(MqttBasePacket packet) + { + if (packet == null) throw new ArgumentNullException(nameof(packet)); + + // This class is tracking all values from Clients perspective! + _lastPacketReceivedTimestamp = DateTime.UtcNow; + + Interlocked.Increment(ref _receivedPacketsCount); + + if (packet is MqttPublishPacket) + { + Interlocked.Increment(ref _receivedApplicationMessagesCount); + } + } + + public void FillClientStatus(MqttClientStatus clientStatus) + { + if (clientStatus == null) throw new ArgumentNullException(nameof(clientStatus)); + + clientStatus.ConnectedTimestamp = _connectedTimestamp; + + clientStatus.ReceivedPacketsCount = Interlocked.Read(ref _receivedPacketsCount); + clientStatus.SentPacketsCount = Interlocked.Read(ref _sentPacketsCount); + + clientStatus.ReceivedApplicationMessagesCount = Interlocked.Read(ref _receivedApplicationMessagesCount); + clientStatus.SentApplicationMessagesCount = Interlocked.Read(ref _sentApplicationMessagesCount); + + clientStatus.LastPacketReceivedTimestamp = _lastPacketReceivedTimestamp; + clientStatus.LastPacketSentTimestamp = _lastPacketSentTimestamp; + + clientStatus.LastNonKeepAlivePacketReceivedTimestamp = _lastNonKeepAlivePacketReceivedTimestamp; + } + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/Internal/MqttClientSession.cs similarity index 96% rename from Source/MQTTnet/Server/MqttClientSession.cs rename to Source/MQTTnet/Server/Internal/MqttClientSession.cs index 620c4c9..524f70b 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSession.cs @@ -1,10 +1,10 @@ -using MQTTnet.Diagnostics; -using MQTTnet.Server.Status; -using System; +using System; using System.Collections.Generic; using System.Threading.Tasks; +using MQTTnet.Diagnostics; +using MQTTnet.Server.Status; -namespace MQTTnet.Server +namespace MQTTnet.Server.Internal { public sealed class MqttClientSession { @@ -81,7 +81,7 @@ namespace MQTTnet.Server return SubscriptionsManager.UnsubscribeAsync(topicFilters); } - public void FillStatus(MqttSessionStatus status) + public void FillSessionStatus(MqttSessionStatus status) { status.ClientId = ClientId; status.CreatedTimestamp = _createdTimestamp; diff --git a/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs b/Source/MQTTnet/Server/Internal/MqttClientSessionApplicationMessagesQueue.cs similarity index 97% rename from Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs rename to Source/MQTTnet/Server/Internal/MqttClientSessionApplicationMessagesQueue.cs index 6ddba92..e24bde0 100644 --- a/Source/MQTTnet/Server/MqttClientSessionApplicationMessagesQueue.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSessionApplicationMessagesQueue.cs @@ -1,10 +1,10 @@ -using MQTTnet.Internal; -using MQTTnet.Protocol; -using System; +using System; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Internal; +using MQTTnet.Protocol; -namespace MQTTnet.Server +namespace MQTTnet.Server.Internal { public sealed class MqttClientSessionApplicationMessagesQueue : IDisposable { diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs similarity index 76% rename from Source/MQTTnet/Server/MqttClientSessionsManager.cs rename to Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs index ebc4398..80945be 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs @@ -1,4 +1,11 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; using MQTTnet.Adapter; +using MQTTnet.Client.Disconnecting; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; using MQTTnet.Formatter; @@ -6,15 +13,8 @@ using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Server.Status; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Client.Disconnecting; -namespace MQTTnet.Server +namespace MQTTnet.Server.Internal { public sealed class MqttClientSessionsManager : IDisposable { @@ -53,53 +53,66 @@ namespace MQTTnet.Server Task.Run(() => TryProcessQueuedApplicationMessagesAsync(cancellationToken), cancellationToken).RunInBackground(_logger); } - public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) + async Task ReceiveConnectPacket(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) { try { - MqttConnectPacket connectPacket; - try + using (var timeoutToken = new CancellationTokenSource(_options.DefaultCommunicationTimeout)) + using (var effectiveCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(timeoutToken.Token, cancellationToken)) { - using (var timeoutToken = new CancellationTokenSource(_options.DefaultCommunicationTimeout)) - { - var firstPacket = await channelAdapter.ReceivePacketAsync(timeoutToken.Token).ConfigureAwait(false); - connectPacket = firstPacket as MqttConnectPacket; - if (connectPacket == null) - { - _logger.Warning(null, - "Client '{0}': First received packet was no 'CONNECT' packet [MQTT-3.1.0-1].", - channelAdapter.Endpoint); + var firstPacket = await channelAdapter.ReceivePacketAsync(effectiveCancellationToken.Token).ConfigureAwait(false); - return; - } + if (firstPacket is MqttConnectPacket connectPacket) + { + return connectPacket; } } - catch (OperationCanceledException) - { - _logger.Warning(null, "Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.Endpoint); - return; - } - catch (MqttCommunicationTimedOutException) + } + catch (OperationCanceledException) + { + _logger.Warning("Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.Endpoint); + } + catch (MqttCommunicationTimedOutException) + { + _logger.Warning("Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.Endpoint); + } + + _logger.Warning("Client '{0}': First received packet was no 'CONNECT' packet [MQTT-3.1.0-1].", channelAdapter.Endpoint); + return null; + } + + public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) + { + MqttClientConnection clientConnection = null; + + try + { + var connectPacket = await ReceiveConnectPacket(channelAdapter, cancellationToken).ConfigureAwait(false); + if (connectPacket == null) { - _logger.Warning(null, "Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.Endpoint); + // Nothing was received in time etc. return; } - var connectionValidatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false); - + MqttConnAckPacket connAckPacket; + + var connectionValidatorContext = await ValidateConnection(connectPacket, channelAdapter).ConfigureAwait(false); if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success) { - // Send failure response here without preparing a session. The result for a successful connect - // will be sent from the session itself. - var connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext); + // Send failure response here without preparing a session! + connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext); await channelAdapter.SendPacketAsync(connAckPacket, cancellationToken).ConfigureAwait(false); - return; } - var connection = await CreateClientConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false); - await _eventDispatcher.SafeNotifyClientConnectedAsync(connectPacket.ClientId).ConfigureAwait(false); - await connection.RunAsync().ConfigureAwait(false); + clientConnection = await CreateClientConnection(connectPacket, channelAdapter, connectionValidatorContext.SessionItems).ConfigureAwait(false); + + connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext); + await channelAdapter.SendPacketAsync(connAckPacket, cancellationToken).ConfigureAwait(false); + + await _eventDispatcher.SafeNotifyClientConnectedAsync(connectPacket, channelAdapter).ConfigureAwait(false); + + await clientConnection.RunAsync().ConfigureAwait(false); } catch (OperationCanceledException) { @@ -108,6 +121,41 @@ namespace MQTTnet.Server { _logger.Error(exception, exception.Message); } + finally + { + if (clientConnection != null) + { + if (clientConnection.ClientId != null) + { + // in case it is a takeover _connections already contains the new connection + if (!clientConnection.IsTakenOver) + { + lock (_connections) + { + _connections.Remove(clientConnection.ClientId); + } + + if (!_options.EnablePersistentSessions) + { + await DeleteSessionAsync(clientConnection.ClientId).ConfigureAwait(false); + } + } + } + + var endpoint = clientConnection.Endpoint; + + if (clientConnection.ClientId != null && !clientConnection.IsTakenOver) + { + // The event is fired at a separate place in case of a handover! + await _eventDispatcher.SafeNotifyClientDisconnectedAsync( + clientConnection.ClientId, + clientConnection.IsCleanDisconnect ? MqttClientDisconnectType.Clean : MqttClientDisconnectType.NotClean, + endpoint).ConfigureAwait(false); + } + } + + await channelAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false); + } } public async Task CloseAllConnectionsAsync() @@ -132,7 +180,7 @@ namespace MQTTnet.Server return _connections.Values.ToList(); } } - + public Task> GetClientStatusAsync() { var result = new List(); @@ -142,10 +190,10 @@ namespace MQTTnet.Server foreach (var connection in _connections.Values) { var clientStatus = new MqttClientStatus(connection); - connection.FillStatus(clientStatus); + connection.FillClientStatus(clientStatus); var sessionStatus = new MqttSessionStatus(connection.Session, this); - connection.Session.FillStatus(sessionStatus); + connection.Session.FillSessionStatus(sessionStatus); clientStatus.Session = sessionStatus; result.Add(clientStatus); @@ -164,7 +212,7 @@ namespace MQTTnet.Server foreach (var session in _sessions.Values) { var sessionStatus = new MqttSessionStatus(session, this); - session.FillStatus(sessionStatus); + session.FillSessionStatus(sessionStatus); result.Add(sessionStatus); } @@ -203,7 +251,7 @@ namespace MQTTnet.Server { _connections.TryGetValue(clientId, out connection); } - + lock (_sessions) { _sessions.Remove(clientId); @@ -213,37 +261,8 @@ namespace MQTTnet.Server { await connection.StopAsync(MqttClientDisconnectReason.NormalDisconnection).ConfigureAwait(false); } - - _logger.Verbose("Session for client '{0}' deleted.", clientId); - } - - public async Task CleanUpClient(string clientId, IMqttChannelAdapter channelAdapter, MqttClientDisconnectType disconnectType) - { - if (clientId != null) - { - // in case it is a takeover _connections already contains the new connection - if (disconnectType != MqttClientDisconnectType.Takeover) - { - lock (_connections) - { - _connections.Remove(clientId); - } - - if (!_options.EnablePersistentSessions) - { - await DeleteSessionAsync(clientId).ConfigureAwait(false); - } - } - } - var endpoint = channelAdapter.Endpoint; - - await SafeCleanupChannelAsync(channelAdapter).ConfigureAwait(false); - - if (clientId != null) - { - await _eventDispatcher.SafeNotifyClientDisconnectedAsync(clientId, disconnectType, endpoint).ConfigureAwait(false); - } + _logger.Verbose("Session for client '{0}' deleted.", clientId); } public void Dispose() @@ -278,7 +297,7 @@ namespace MQTTnet.Server MqttPendingApplicationMessage queuedApplicationMessage; try { - queuedApplicationMessage = _messageQueue.Take(cancellationToken); + queuedApplicationMessage = _messageQueue.Take(cancellationToken); } catch (ArgumentNullException) { @@ -315,7 +334,7 @@ namespace MQTTnet.Server applicationMessage = interceptorContext.ApplicationMessage; } } - + await _eventDispatcher.SafeNotifyApplicationMessageReceivedAsync(senderClientId, applicationMessage).ConfigureAwait(false); if (applicationMessage.Retain) @@ -360,9 +379,12 @@ namespace MQTTnet.Server } } - async Task ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter) + async Task ValidateConnection(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter) { - var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter, new ConcurrentDictionary()); + var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter) + { + SessionItems = new ConcurrentDictionary() + }; var connectionValidator = _options.ConnectionValidator; @@ -388,9 +410,8 @@ namespace MQTTnet.Server return context; } - async Task CreateClientConnectionAsync(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter) + async Task CreateClientConnection(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter, IDictionary sessionItems) { - MqttClientConnection existingConnection; MqttClientConnection connection; using (await _createConnectionSyncRoot.WaitAsync(CancellationToken.None).ConfigureAwait(false)) @@ -401,14 +422,14 @@ namespace MQTTnet.Server if (!_sessions.TryGetValue(connectPacket.ClientId, out session)) { _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); - session = CreateSession(connectPacket.ClientId, connectionValidatorContext); + session = CreateSession(connectPacket.ClientId, sessionItems); } else { if (connectPacket.CleanSession) { _logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId); - session = CreateSession(connectPacket.ClientId, connectionValidatorContext); + session = CreateSession(connectPacket.ClientId, sessionItems); } else { @@ -419,16 +440,21 @@ namespace MQTTnet.Server _sessions[connectPacket.ClientId] = session; } + MqttClientConnection existingConnection; + lock (_connections) { _connections.TryGetValue(connectPacket.ClientId, out existingConnection); - connection = CreateConnection(connectPacket, channelAdapter, session, connectionValidatorContext); + connection = CreateConnection(connectPacket, channelAdapter, session); _connections[connectPacket.ClientId] = connection; } if (existingConnection != null) { + await _eventDispatcher.SafeNotifyClientDisconnectedAsync(existingConnection.ClientId, MqttClientDisconnectType.Takeover, existingConnection.Endpoint); + + existingConnection.IsTakenOver = true; await existingConnection.StopAsync(MqttClientDisconnectReason.SessionTakenOver).ConfigureAwait(false); } } @@ -436,7 +462,8 @@ namespace MQTTnet.Server return connection; } - async Task InterceptApplicationMessageAsync(IMqttServerApplicationMessageInterceptor interceptor, MqttClientConnection clientConnection, MqttApplicationMessage applicationMessage) + async Task InterceptApplicationMessageAsync(IMqttServerApplicationMessageInterceptor interceptor, MqttClientConnection clientConnection, + MqttApplicationMessage applicationMessage) { string senderClientId; IDictionary sessionItems; @@ -453,29 +480,20 @@ namespace MQTTnet.Server sessionItems = clientConnection.Session.Items; } - var interceptorContext = new MqttApplicationMessageInterceptorContext(senderClientId, sessionItems, _logger) + var interceptorContext = new MqttApplicationMessageInterceptorContext { + ClientId = senderClientId, + SessionItems = sessionItems, + Logger = _logger, AcceptPublish = true, ApplicationMessage = applicationMessage, - CloseConnection = false + CloseConnection = false }; await interceptor.InterceptApplicationMessagePublishAsync(interceptorContext).ConfigureAwait(false); return interceptorContext; } - async Task SafeCleanupChannelAsync(IMqttChannelAdapter channelAdapter) - { - try - { - await channelAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false); - } - catch (Exception exception) - { - _logger.Error(exception, "Error while disconnecting client channel."); - } - } - MqttClientSession GetSession(string clientId) { lock (_sessions) @@ -489,24 +507,23 @@ namespace MQTTnet.Server } } - MqttClientConnection CreateConnection(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter, MqttClientSession session, MqttConnectionValidatorContext connectionValidatorContext) + MqttClientConnection CreateConnection(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter, MqttClientSession session) { return new MqttClientConnection( connectPacket, channelAdapter, session, - connectionValidatorContext, _options, this, _retainedMessagesManager, _rootLogger); } - MqttClientSession CreateSession(string clientId, MqttConnectionValidatorContext connectionValidatorContext) + MqttClientSession CreateSession(string clientId, IDictionary sessionItems) { return new MqttClientSession( clientId, - connectionValidatorContext.SessionItems, + sessionItems, _eventDispatcher, _options, _retainedMessagesManager, diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs similarity index 84% rename from Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs rename to Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs index 15066cc..09f1aea 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs @@ -1,12 +1,12 @@ -using MQTTnet.Packets; -using MQTTnet.Protocol; -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Packets; +using MQTTnet.Protocol; -namespace MQTTnet.Server +namespace MQTTnet.Server.Internal { public sealed class MqttClientSubscriptionsManager { @@ -37,9 +37,11 @@ namespace MQTTnet.Server { var interceptorContext = await InterceptSubscribeAsync(originalTopicFilter).ConfigureAwait(false); - var finalTopicFilter = interceptorContext.TopicFilter; + var finalTopicFilter = interceptorContext?.TopicFilter ?? originalTopicFilter; + var acceptSubscription = interceptorContext?.AcceptSubscription ?? true; + var closeConnection = interceptorContext?.CloseConnection ?? false; - if (finalTopicFilter == null || string.IsNullOrEmpty(finalTopicFilter.Topic) || !interceptorContext.AcceptSubscription) + if (finalTopicFilter == null || string.IsNullOrEmpty(finalTopicFilter.Topic) || !acceptSubscription) { result.ReturnCodes.Add(MqttSubscribeReturnCode.Failure); result.ReasonCodes.Add(MqttSubscribeReasonCode.UnspecifiedError); @@ -50,12 +52,12 @@ namespace MQTTnet.Server result.ReasonCodes.Add(ConvertToSubscribeReasonCode(finalTopicFilter.QualityOfServiceLevel)); } - if (interceptorContext.CloseConnection) + if (closeConnection) { result.CloseConnection = true; } - if (!interceptorContext.AcceptSubscription || string.IsNullOrEmpty(finalTopicFilter?.Topic)) + if (!acceptSubscription || string.IsNullOrEmpty(finalTopicFilter?.Topic)) { continue; } @@ -83,12 +85,7 @@ namespace MQTTnet.Server foreach (var topicFilter in topicFilters) { var interceptorContext = await InterceptSubscribeAsync(topicFilter).ConfigureAwait(false); - if (!interceptorContext.AcceptSubscription) - { - continue; - } - - if (!interceptorContext.AcceptSubscription) + if (interceptorContext != null && !interceptorContext.AcceptSubscription) { continue; } @@ -116,7 +113,7 @@ namespace MQTTnet.Server foreach (var topicFilter in unsubscribePacket.TopicFilters) { var interceptorContext = await InterceptUnsubscribeAsync(topicFilter).ConfigureAwait(false); - if (!interceptorContext.AcceptUnsubscription) + if (interceptorContext != null && !interceptorContext.AcceptUnsubscription) { reasonCodes.Add(MqttUnsubscribeReasonCode.ImplementationSpecificError); continue; @@ -150,7 +147,7 @@ namespace MQTTnet.Server foreach (var topicFilter in topicFilters) { var interceptorContext = await InterceptUnsubscribeAsync(topicFilter).ConfigureAwait(false); - if (!interceptorContext.AcceptUnsubscription) + if (interceptorContext != null && !interceptorContext.AcceptUnsubscription) { continue; } @@ -187,7 +184,7 @@ namespace MQTTnet.Server { continue; } - + qosLevels.Add(subscription.Value.QualityOfServiceLevel); } @@ -211,23 +208,41 @@ namespace MQTTnet.Server async Task InterceptSubscribeAsync(MqttTopicFilter topicFilter) { - var context = new MqttSubscriptionInterceptorContext(_clientSession.ClientId, topicFilter, _clientSession.Items); - if (_options.SubscriptionInterceptor != null) + var interceptor = _options.SubscriptionInterceptor; + if (interceptor == null) { - await _options.SubscriptionInterceptor.InterceptSubscriptionAsync(context).ConfigureAwait(false); + return null; } + var context = new MqttSubscriptionInterceptorContext + { + ClientId = _clientSession.ClientId, + TopicFilter = topicFilter, + SessionItems = _clientSession.Items + }; + + await interceptor.InterceptSubscriptionAsync(context).ConfigureAwait(false); + return context; } async Task InterceptUnsubscribeAsync(string topicFilter) { - var context = new MqttUnsubscriptionInterceptorContext(_clientSession.ClientId, topicFilter, _clientSession.Items); - if (_options.UnsubscriptionInterceptor != null) + var interceptor = _options.UnsubscriptionInterceptor; + if (interceptor == null) { - await _options.UnsubscriptionInterceptor.InterceptUnsubscriptionAsync(context).ConfigureAwait(false); + return null; } + var context = new MqttUnsubscriptionInterceptorContext + { + ClientId = _clientSession.ClientId, + Topic = topicFilter, + SessionItems = _clientSession.Items + }; + + await interceptor.InterceptUnsubscriptionAsync(context).ConfigureAwait(false); + return context; } @@ -254,4 +269,4 @@ namespace MQTTnet.Server }; } } -} +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs b/Source/MQTTnet/Server/Internal/MqttRetainedMessagesManager.cs similarity index 98% rename from Source/MQTTnet/Server/MqttRetainedMessagesManager.cs rename to Source/MQTTnet/Server/Internal/MqttRetainedMessagesManager.cs index 285f224..a0afabd 100644 --- a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs +++ b/Source/MQTTnet/Server/Internal/MqttRetainedMessagesManager.cs @@ -1,13 +1,13 @@ -using MQTTnet.Diagnostics; -using MQTTnet.Implementations; -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; +using MQTTnet.Diagnostics; +using MQTTnet.Implementations; using MQTTnet.Internal; -namespace MQTTnet.Server +namespace MQTTnet.Server.Internal { public sealed class MqttRetainedMessagesManager : IMqttRetainedMessagesManager { diff --git a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs b/Source/MQTTnet/Server/Internal/MqttServerEventDispatcher.cs similarity index 76% rename from Source/MQTTnet/Server/MqttServerEventDispatcher.cs rename to Source/MQTTnet/Server/Internal/MqttServerEventDispatcher.cs index 0c2d1b0..4023c5a 100644 --- a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs +++ b/Source/MQTTnet/Server/Internal/MqttServerEventDispatcher.cs @@ -1,9 +1,11 @@ -using MQTTnet.Client.Receiving; -using MQTTnet.Diagnostics; using System; using System.Threading.Tasks; +using MQTTnet.Adapter; +using MQTTnet.Client.Receiving; +using MQTTnet.Diagnostics; +using MQTTnet.Packets; -namespace MQTTnet.Server +namespace MQTTnet.Server.Internal { public sealed class MqttServerEventDispatcher { @@ -26,7 +28,7 @@ namespace MQTTnet.Server public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get; set; } - public async Task SafeNotifyClientConnectedAsync(string clientId) + public async Task SafeNotifyClientConnectedAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter) { try { @@ -36,7 +38,13 @@ namespace MQTTnet.Server return; } - await handler.HandleClientConnectedAsync(new MqttServerClientConnectedEventArgs(clientId)).ConfigureAwait(false); + await handler.HandleClientConnectedAsync(new MqttServerClientConnectedEventArgs + { + ClientId = connectPacket.ClientId, + UserName = connectPacket.Username, + ProtocolVersion = channelAdapter.PacketFormatterAdapter.ProtocolVersion, + Endpoint = channelAdapter.Endpoint + }).ConfigureAwait(false); } catch (Exception exception) { @@ -54,7 +62,12 @@ namespace MQTTnet.Server return; } - await handler.HandleClientDisconnectedAsync(new MqttServerClientDisconnectedEventArgs(clientId, disconnectType, endpoint)).ConfigureAwait(false); + await handler.HandleClientDisconnectedAsync(new MqttServerClientDisconnectedEventArgs + { + ClientId = clientId, + DisconnectType = disconnectType, + Endpoint = endpoint + }).ConfigureAwait(false); } catch (Exception exception) { @@ -72,7 +85,11 @@ namespace MQTTnet.Server return; } - await handler.HandleClientSubscribedTopicAsync(new MqttServerClientSubscribedTopicEventArgs(clientId, topicFilter)).ConfigureAwait(false); + await handler.HandleClientSubscribedTopicAsync(new MqttServerClientSubscribedTopicEventArgs + { + ClientId = clientId, + TopicFilter = topicFilter + }).ConfigureAwait(false); } catch (Exception exception) { @@ -90,7 +107,11 @@ namespace MQTTnet.Server return; } - await handler.HandleClientUnsubscribedTopicAsync(new MqttServerClientUnsubscribedTopicEventArgs(clientId, topicFilter)).ConfigureAwait(false); + await handler.HandleClientUnsubscribedTopicAsync(new MqttServerClientUnsubscribedTopicEventArgs + { + ClientId = clientId, + TopicFilter = topicFilter + }).ConfigureAwait(false); } catch (Exception exception) { diff --git a/Source/MQTTnet/Server/MqttServerKeepAliveMonitor.cs b/Source/MQTTnet/Server/Internal/MqttServerKeepAliveMonitor.cs similarity index 92% rename from Source/MQTTnet/Server/MqttServerKeepAliveMonitor.cs rename to Source/MQTTnet/Server/Internal/MqttServerKeepAliveMonitor.cs index 7f967e9..629485f 100644 --- a/Source/MQTTnet/Server/MqttServerKeepAliveMonitor.cs +++ b/Source/MQTTnet/Server/Internal/MqttServerKeepAliveMonitor.cs @@ -1,12 +1,12 @@ -using MQTTnet.Diagnostics; -using MQTTnet.Internal; -using System; +using System; using System.Threading; using System.Threading.Tasks; using MQTTnet.Client.Disconnecting; +using MQTTnet.Diagnostics; using MQTTnet.Implementations; +using MQTTnet.Internal; -namespace MQTTnet.Server +namespace MQTTnet.Server.Internal { public sealed class MqttServerKeepAliveMonitor { @@ -70,7 +70,7 @@ namespace MQTTnet.Server { try { - if (connection.Status != MqttClientConnectionStatus.Running) + if (!connection.IsRunning) { // The connection is already dead or just created so there is no need to check it. return; @@ -94,14 +94,14 @@ namespace MQTTnet.Server // If the client sends 1 sec. the server will allow up to 1.5 seconds. var maxDurationWithoutPacket = connection.ConnectPacket.KeepAlivePeriod * 1.5D; - var secondsWithoutPackage = (now - connection.LastPacketReceivedTimestamp).TotalSeconds; + var secondsWithoutPackage = (now - connection.Statistics.LastPacketReceivedTimestamp).TotalSeconds; if (secondsWithoutPackage < maxDurationWithoutPacket) { // A packet was received before the timeout is affected. return; } - _logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", connection.ClientId); + _logger.Warning("Client '{0}': Did not receive any packet or keep alive signal.", connection.ClientId); // Execute the disconnection in background so that the keep alive monitor can continue // with checking other connections. diff --git a/Source/MQTTnet/Server/MqttTopicFilterComparer.cs b/Source/MQTTnet/Server/Internal/MqttTopicFilterComparer.cs similarity index 99% rename from Source/MQTTnet/Server/MqttTopicFilterComparer.cs rename to Source/MQTTnet/Server/Internal/MqttTopicFilterComparer.cs index 5b104b4..1fe976e 100644 --- a/Source/MQTTnet/Server/MqttTopicFilterComparer.cs +++ b/Source/MQTTnet/Server/Internal/MqttTopicFilterComparer.cs @@ -1,6 +1,6 @@ using System; -namespace MQTTnet.Server +namespace MQTTnet.Server.Internal { public static class MqttTopicFilterComparer { diff --git a/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs b/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs index f606a57..c4df979 100644 --- a/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs +++ b/Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs @@ -1,35 +1,27 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; using MQTTnet.Diagnostics; namespace MQTTnet.Server { public sealed class MqttApplicationMessageInterceptorContext { - public MqttApplicationMessageInterceptorContext(string clientId, IDictionary sessionItems, IMqttNetScopedLogger logger) - { - ClientId = clientId; - SessionItems = sessionItems; - Logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - /// /// Gets the currently used logger. /// - public IMqttNetScopedLogger Logger { get; } + public IMqttNetScopedLogger Logger { get; internal set; } /// /// Gets the client identifier. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. /// - public string ClientId { get; } + public string ClientId { get; internal set; } public MqttApplicationMessage ApplicationMessage { get; set; } /// /// Gets or sets a key/value collection that can be used to share data within the scope of this session. /// - public IDictionary SessionItems { get; } + public IDictionary SessionItems { get; internal set; } public bool AcceptPublish { get; set; } = true; diff --git a/Source/MQTTnet/Server/MqttClientConnectionStatus.cs b/Source/MQTTnet/Server/MqttClientConnectionStatus.cs deleted file mode 100644 index 8a132b0..0000000 --- a/Source/MQTTnet/Server/MqttClientConnectionStatus.cs +++ /dev/null @@ -1,11 +0,0 @@ -namespace MQTTnet.Server -{ - public enum MqttClientConnectionStatus - { - Initializing, - - Running, - - Finalizing - } -} \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttClientMessageQueueInterceptorContext.cs b/Source/MQTTnet/Server/MqttClientMessageQueueInterceptorContext.cs index 881666e..39b08ff 100644 --- a/Source/MQTTnet/Server/MqttClientMessageQueueInterceptorContext.cs +++ b/Source/MQTTnet/Server/MqttClientMessageQueueInterceptorContext.cs @@ -2,19 +2,11 @@ namespace MQTTnet.Server { - public class MqttClientMessageQueueInterceptorContext + public sealed class MqttClientMessageQueueInterceptorContext { - public MqttClientMessageQueueInterceptorContext(string senderClientId, string receiverClientId, MqttApplicationMessage applicationMessage, MqttQualityOfServiceLevel subscriptionQualityOfServiceLevel) - { - SenderClientId = senderClientId; - ReceiverClientId = receiverClientId; - ApplicationMessage = applicationMessage; - SubscriptionQualityOfServiceLevel = subscriptionQualityOfServiceLevel; - } + public string SenderClientId { get; internal set; } - public string SenderClientId { get; } - - public string ReceiverClientId { get; } + public string ReceiverClientId { get; internal set; } public MqttApplicationMessage ApplicationMessage { get; set; } diff --git a/Source/MQTTnet/Server/MqttClientMessageQueueInterceptorDelegate.cs b/Source/MQTTnet/Server/MqttClientMessageQueueInterceptorDelegate.cs index 2cb7fc3..9d1118d 100644 --- a/Source/MQTTnet/Server/MqttClientMessageQueueInterceptorDelegate.cs +++ b/Source/MQTTnet/Server/MqttClientMessageQueueInterceptorDelegate.cs @@ -1,11 +1,12 @@ using System; using System.Threading.Tasks; +using MQTTnet.Implementations; namespace MQTTnet.Server { - public class MqttClientMessageQueueInterceptorDelegate : IMqttServerClientMessageQueueInterceptor + public sealed class MqttClientMessageQueueInterceptorDelegate : IMqttServerClientMessageQueueInterceptor { - private readonly Func _callback; + readonly Func _callback; public MqttClientMessageQueueInterceptorDelegate(Action callback) { @@ -14,7 +15,7 @@ namespace MQTTnet.Server _callback = context => { callback(context); - return Task.FromResult(0); + return PlatformAbstractionLayer.CompletedTask; }; } diff --git a/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs b/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs index e9d6aeb..57d3952 100644 --- a/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs +++ b/Source/MQTTnet/Server/MqttConnectionValidatorContext.cs @@ -4,21 +4,21 @@ using System.Security.Cryptography.X509Certificates; using System.Text; using MQTTnet.Adapter; using MQTTnet.Formatter; +using MQTTnet.Implementations; using MQTTnet.Packets; using MQTTnet.Protocol; namespace MQTTnet.Server { - public class MqttConnectionValidatorContext + public sealed class MqttConnectionValidatorContext { - private readonly MqttConnectPacket _connectPacket; - private readonly IMqttChannelAdapter _clientAdapter; + readonly MqttConnectPacket _connectPacket; + readonly IMqttChannelAdapter _clientAdapter; - public MqttConnectionValidatorContext(MqttConnectPacket connectPacket, IMqttChannelAdapter clientAdapter, IDictionary sessionItems) + public MqttConnectionValidatorContext(MqttConnectPacket connectPacket, IMqttChannelAdapter clientAdapter) { _connectPacket = connectPacket; _clientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter)); - SessionItems = sessionItems; } /// @@ -39,7 +39,7 @@ namespace MQTTnet.Server public byte[] RawPassword => _connectPacket?.Password; - public string Password => Encoding.UTF8.GetString(RawPassword ?? new byte[0]); + public string Password => Encoding.UTF8.GetString(RawPassword ?? PlatformAbstractionLayer.EmptyByteArray); /// /// Gets or sets the will delay interval. @@ -128,7 +128,7 @@ namespace MQTTnet.Server /// /// Gets or sets a key/value collection that can be used to share data within the scope of this session. /// - public IDictionary SessionItems { get; } + public IDictionary SessionItems { get; internal set; } /// /// This is used for MQTTv3 only. diff --git a/Source/MQTTnet/Server/MqttPendingApplicationMessage.cs b/Source/MQTTnet/Server/MqttPendingApplicationMessage.cs index dc198d5..52ed9c7 100644 --- a/Source/MQTTnet/Server/MqttPendingApplicationMessage.cs +++ b/Source/MQTTnet/Server/MqttPendingApplicationMessage.cs @@ -1,4 +1,6 @@ -namespace MQTTnet.Server +using MQTTnet.Server.Internal; + +namespace MQTTnet.Server { public sealed class MqttPendingApplicationMessage { diff --git a/Source/MQTTnet/Server/MqttQueuedApplicationMessage.cs b/Source/MQTTnet/Server/MqttQueuedApplicationMessage.cs index 5ca163d..8ffd55f 100644 --- a/Source/MQTTnet/Server/MqttQueuedApplicationMessage.cs +++ b/Source/MQTTnet/Server/MqttQueuedApplicationMessage.cs @@ -3,7 +3,7 @@ using MQTTnet.Protocol; namespace MQTTnet.Server { - public class MqttQueuedApplicationMessage + public sealed class MqttQueuedApplicationMessage { public MqttApplicationMessage ApplicationMessage { get; set; } diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index 92ef34f..1b42feb 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -12,6 +12,7 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Implementations; using MQTTnet.Internal; +using MQTTnet.Server.Internal; namespace MQTTnet.Server { diff --git a/Source/MQTTnet/Server/MqttServerApplicationMessageInterceptorDelegate.cs b/Source/MQTTnet/Server/MqttServerApplicationMessageInterceptorDelegate.cs index 160ed48..e946399 100644 --- a/Source/MQTTnet/Server/MqttServerApplicationMessageInterceptorDelegate.cs +++ b/Source/MQTTnet/Server/MqttServerApplicationMessageInterceptorDelegate.cs @@ -1,5 +1,6 @@ using System; using System.Threading.Tasks; +using MQTTnet.Implementations; namespace MQTTnet.Server { @@ -14,7 +15,7 @@ namespace MQTTnet.Server _callback = context => { callback(context); - return Task.FromResult(0); + return PlatformAbstractionLayer.CompletedTask; }; } diff --git a/Source/MQTTnet/Server/MqttServerClientConnectedEventArgs.cs b/Source/MQTTnet/Server/MqttServerClientConnectedEventArgs.cs index 88f0c55..21e8df3 100644 --- a/Source/MQTTnet/Server/MqttServerClientConnectedEventArgs.cs +++ b/Source/MQTTnet/Server/MqttServerClientConnectedEventArgs.cs @@ -1,18 +1,29 @@ using System; +using MQTTnet.Formatter; namespace MQTTnet.Server { - public class MqttServerClientConnectedEventArgs : EventArgs + public sealed class MqttServerClientConnectedEventArgs : EventArgs { - public MqttServerClientConnectedEventArgs(string clientId) - { - ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); - } - /// - /// Gets the client identifier. + /// Gets the client identifier of the connected client. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. /// - public string ClientId { get; } + public string ClientId { get; internal set; } + + /// + /// Gets the user name of the connected client. + /// + public string UserName { get; internal set; } + + /// + /// Gets the protocol version which is used by the connected client. + /// + public MqttProtocolVersion ProtocolVersion { get; internal set; } + + /// + /// Gets the endpoint of the connected client. + /// + public string Endpoint { get; internal set; } } } diff --git a/Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs b/Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs index 2883d66..a416ea9 100644 --- a/Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs +++ b/Source/MQTTnet/Server/MqttServerClientConnectedHandlerDelegate.cs @@ -1,11 +1,12 @@ using System; using System.Threading.Tasks; +using MQTTnet.Implementations; namespace MQTTnet.Server { - public class MqttServerClientConnectedHandlerDelegate : IMqttServerClientConnectedHandler + public sealed class MqttServerClientConnectedHandlerDelegate : IMqttServerClientConnectedHandler { - private readonly Func _handler; + readonly Func _handler; public MqttServerClientConnectedHandlerDelegate(Action handler) { @@ -14,7 +15,7 @@ namespace MQTTnet.Server _handler = eventArgs => { handler(eventArgs); - return Task.FromResult(0); + return PlatformAbstractionLayer.CompletedTask; }; } diff --git a/Source/MQTTnet/Server/MqttServerClientDisconnectedEventArgs.cs b/Source/MQTTnet/Server/MqttServerClientDisconnectedEventArgs.cs index 9834b20..77ad076 100644 --- a/Source/MQTTnet/Server/MqttServerClientDisconnectedEventArgs.cs +++ b/Source/MQTTnet/Server/MqttServerClientDisconnectedEventArgs.cs @@ -2,23 +2,16 @@ using System; namespace MQTTnet.Server { - public class MqttServerClientDisconnectedEventArgs : EventArgs + public sealed class MqttServerClientDisconnectedEventArgs : EventArgs { - public MqttServerClientDisconnectedEventArgs(string clientId, MqttClientDisconnectType disconnectType, string endpoint) - { - ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); - DisconnectType = disconnectType; - Endpoint = endpoint; - } - /// /// Gets the client identifier. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. /// - public string ClientId { get; } + public string ClientId { get; internal set; } - public MqttClientDisconnectType DisconnectType { get; } + public MqttClientDisconnectType DisconnectType { get; internal set; } - public string Endpoint { get; } + public string Endpoint { get; internal set; } } } diff --git a/Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs b/Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs index b1d495a..2a03df9 100644 --- a/Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs +++ b/Source/MQTTnet/Server/MqttServerClientDisconnectedHandlerDelegate.cs @@ -1,11 +1,12 @@ using System; using System.Threading.Tasks; +using MQTTnet.Implementations; namespace MQTTnet.Server { - public class MqttServerClientDisconnectedHandlerDelegate : IMqttServerClientDisconnectedHandler + public sealed class MqttServerClientDisconnectedHandlerDelegate : IMqttServerClientDisconnectedHandler { - private readonly Func _handler; + readonly Func _handler; public MqttServerClientDisconnectedHandlerDelegate(Action handler) { @@ -14,7 +15,7 @@ namespace MQTTnet.Server _handler = eventArgs => { handler(eventArgs); - return Task.FromResult(0); + return PlatformAbstractionLayer.CompletedTask; }; } diff --git a/Source/MQTTnet/Server/MqttServerClientSubscribedTopicEventArgs.cs b/Source/MQTTnet/Server/MqttServerClientSubscribedTopicEventArgs.cs index 3586b73..23850b2 100644 --- a/Source/MQTTnet/Server/MqttServerClientSubscribedTopicEventArgs.cs +++ b/Source/MQTTnet/Server/MqttServerClientSubscribedTopicEventArgs.cs @@ -2,24 +2,18 @@ namespace MQTTnet.Server { - public class MqttServerClientSubscribedTopicEventArgs : EventArgs + public sealed class MqttServerClientSubscribedTopicEventArgs : EventArgs { - public MqttServerClientSubscribedTopicEventArgs(string clientId, MqttTopicFilter topicFilter) - { - ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); - TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter)); - } - /// /// Gets the client identifier. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. /// - public string ClientId { get; } + public string ClientId { get; internal set; } /// /// Gets the topic filter. /// The topic filter can contain topics and wildcards. /// - public MqttTopicFilter TopicFilter { get; } + public MqttTopicFilter TopicFilter { get; internal set; } } } diff --git a/Source/MQTTnet/Server/MqttServerClientSubscribedTopicHandlerDelegate.cs b/Source/MQTTnet/Server/MqttServerClientSubscribedTopicHandlerDelegate.cs index 048b0a8..6dbf4a3 100644 --- a/Source/MQTTnet/Server/MqttServerClientSubscribedTopicHandlerDelegate.cs +++ b/Source/MQTTnet/Server/MqttServerClientSubscribedTopicHandlerDelegate.cs @@ -1,24 +1,37 @@ using System; using System.Threading.Tasks; +using MQTTnet.Implementations; namespace MQTTnet.Server { - public class MqttServerClientSubscribedHandlerDelegate : IMqttServerClientSubscribedTopicHandler + [Obsolete("Use MqttServerClientSubscribedTopicHandlerDelegate instead. This will be removed in a future version.")] + public sealed class MqttServerClientSubscribedHandlerDelegate : MqttServerClientSubscribedTopicHandlerDelegate { - private readonly Func _handler; + public MqttServerClientSubscribedHandlerDelegate(Action handler) : base(handler) + { + } + + public MqttServerClientSubscribedHandlerDelegate(Func handler) : base(handler) + { + } + } + + public class MqttServerClientSubscribedTopicHandlerDelegate : IMqttServerClientSubscribedTopicHandler + { + readonly Func _handler; - public MqttServerClientSubscribedHandlerDelegate(Action handler) + public MqttServerClientSubscribedTopicHandlerDelegate(Action handler) { if (handler == null) throw new ArgumentNullException(nameof(handler)); _handler = eventArgs => { handler(eventArgs); - return Task.FromResult(0); + return PlatformAbstractionLayer.CompletedTask; }; } - public MqttServerClientSubscribedHandlerDelegate(Func handler) + public MqttServerClientSubscribedTopicHandlerDelegate(Func handler) { _handler = handler ?? throw new ArgumentNullException(nameof(handler)); } diff --git a/Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicEventArgs.cs b/Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicEventArgs.cs index 8ed1264..341e1f4 100644 --- a/Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicEventArgs.cs +++ b/Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicEventArgs.cs @@ -2,24 +2,18 @@ namespace MQTTnet.Server { - public class MqttServerClientUnsubscribedTopicEventArgs : EventArgs + public sealed class MqttServerClientUnsubscribedTopicEventArgs : EventArgs { - public MqttServerClientUnsubscribedTopicEventArgs(string clientId, string topicFilter) - { - ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); - TopicFilter = topicFilter ?? throw new ArgumentNullException(nameof(topicFilter)); - } - /// /// Gets the client identifier. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. /// - public string ClientId { get; } + public string ClientId { get; internal set; } /// /// Gets or sets the topic filter. /// The topic filter can contain topics and wildcards. /// - public string TopicFilter { get; } + public string TopicFilter { get; internal set; } } } diff --git a/Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicHandlerDelegate.cs b/Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicHandlerDelegate.cs index 4416287..c93636c 100644 --- a/Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicHandlerDelegate.cs +++ b/Source/MQTTnet/Server/MqttServerClientUnsubscribedTopicHandlerDelegate.cs @@ -1,11 +1,12 @@ using System; using System.Threading.Tasks; +using MQTTnet.Implementations; namespace MQTTnet.Server { - public class MqttServerClientUnsubscribedTopicHandlerDelegate : IMqttServerClientUnsubscribedTopicHandler + public sealed class MqttServerClientUnsubscribedTopicHandlerDelegate : IMqttServerClientUnsubscribedTopicHandler { - private readonly Func _handler; + readonly Func _handler; public MqttServerClientUnsubscribedTopicHandlerDelegate(Action handler) { @@ -14,7 +15,7 @@ namespace MQTTnet.Server _handler = eventArgs => { handler(eventArgs); - return Task.FromResult(0); + return PlatformAbstractionLayer.CompletedTask; }; } diff --git a/Source/MQTTnet/Server/MqttServerConnectionValidatorDelegate.cs b/Source/MQTTnet/Server/MqttServerConnectionValidatorDelegate.cs index 8be08c8..b6a51a1 100644 --- a/Source/MQTTnet/Server/MqttServerConnectionValidatorDelegate.cs +++ b/Source/MQTTnet/Server/MqttServerConnectionValidatorDelegate.cs @@ -1,11 +1,12 @@ using System; using System.Threading.Tasks; +using MQTTnet.Implementations; namespace MQTTnet.Server { - public class MqttServerConnectionValidatorDelegate : IMqttServerConnectionValidator + public sealed class MqttServerConnectionValidatorDelegate : IMqttServerConnectionValidator { - private readonly Func _callback; + readonly Func _callback; public MqttServerConnectionValidatorDelegate(Action callback) { @@ -14,7 +15,7 @@ namespace MQTTnet.Server _callback = context => { callback(context); - return Task.FromResult(0); + return PlatformAbstractionLayer.CompletedTask; }; } diff --git a/Source/MQTTnet/Server/MqttServerMultiThreadedApplicationMessageInterceptorDelegate.cs b/Source/MQTTnet/Server/MqttServerMultiThreadedApplicationMessageInterceptorDelegate.cs index b99ca18..9f8bf4f 100644 --- a/Source/MQTTnet/Server/MqttServerMultiThreadedApplicationMessageInterceptorDelegate.cs +++ b/Source/MQTTnet/Server/MqttServerMultiThreadedApplicationMessageInterceptorDelegate.cs @@ -17,7 +17,7 @@ namespace MQTTnet.Server _callback = context => { callback(context); - return Task.FromResult(0); + return PlatformAbstractionLayer.CompletedTask; }; } diff --git a/Source/MQTTnet/Server/MqttServerOptions.cs b/Source/MQTTnet/Server/MqttServerOptions.cs index 1457d9e..101181c 100644 --- a/Source/MQTTnet/Server/MqttServerOptions.cs +++ b/Source/MQTTnet/Server/MqttServerOptions.cs @@ -1,4 +1,5 @@ using System; +using MQTTnet.Server.Internal; namespace MQTTnet.Server { diff --git a/Source/MQTTnet/Server/MqttServerStartedHandlerDelegate.cs b/Source/MQTTnet/Server/MqttServerStartedHandlerDelegate.cs index 8a05357..ad33184 100644 --- a/Source/MQTTnet/Server/MqttServerStartedHandlerDelegate.cs +++ b/Source/MQTTnet/Server/MqttServerStartedHandlerDelegate.cs @@ -1,11 +1,12 @@ using System; using System.Threading.Tasks; +using MQTTnet.Implementations; namespace MQTTnet.Server { - public class MqttServerStartedHandlerDelegate : IMqttServerStartedHandler + public sealed class MqttServerStartedHandlerDelegate : IMqttServerStartedHandler { - private readonly Func _handler; + readonly Func _handler; public MqttServerStartedHandlerDelegate(Action handler) { @@ -14,7 +15,7 @@ namespace MQTTnet.Server _handler = eventArgs => { handler(eventArgs); - return Task.FromResult(0); + return PlatformAbstractionLayer.CompletedTask; }; } diff --git a/Source/MQTTnet/Server/MqttServerStoppedHandlerDelegate.cs b/Source/MQTTnet/Server/MqttServerStoppedHandlerDelegate.cs index 14e6a62..8a9a37e 100644 --- a/Source/MQTTnet/Server/MqttServerStoppedHandlerDelegate.cs +++ b/Source/MQTTnet/Server/MqttServerStoppedHandlerDelegate.cs @@ -1,11 +1,12 @@ using System; using System.Threading.Tasks; +using MQTTnet.Implementations; namespace MQTTnet.Server { - public class MqttServerStoppedHandlerDelegate : IMqttServerStoppedHandler + public sealed class MqttServerStoppedHandlerDelegate : IMqttServerStoppedHandler { - private readonly Func _handler; + readonly Func _handler; public MqttServerStoppedHandlerDelegate(Action handler) { @@ -14,7 +15,7 @@ namespace MQTTnet.Server _handler = eventArgs => { handler(eventArgs); - return Task.FromResult(0); + return PlatformAbstractionLayer.CompletedTask; }; } diff --git a/Source/MQTTnet/Server/MqttServerSubscriptionInterceptorDelegate.cs b/Source/MQTTnet/Server/MqttServerSubscriptionInterceptorDelegate.cs index f500e07..88d6823 100644 --- a/Source/MQTTnet/Server/MqttServerSubscriptionInterceptorDelegate.cs +++ b/Source/MQTTnet/Server/MqttServerSubscriptionInterceptorDelegate.cs @@ -1,11 +1,12 @@ using System; using System.Threading.Tasks; +using MQTTnet.Implementations; namespace MQTTnet.Server { - public class MqttServerSubscriptionInterceptorDelegate : IMqttServerSubscriptionInterceptor + public sealed class MqttServerSubscriptionInterceptorDelegate : IMqttServerSubscriptionInterceptor { - private readonly Func _callback; + readonly Func _callback; public MqttServerSubscriptionInterceptorDelegate(Action callback) { @@ -14,7 +15,7 @@ namespace MQTTnet.Server _callback = context => { callback(context); - return Task.FromResult(0); + return PlatformAbstractionLayer.CompletedTask; }; } diff --git a/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs b/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs index cc186e5..025a650 100644 --- a/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs +++ b/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs @@ -2,20 +2,13 @@ namespace MQTTnet.Server { - public class MqttSubscriptionInterceptorContext + public sealed class MqttSubscriptionInterceptorContext { - public MqttSubscriptionInterceptorContext(string clientId, MqttTopicFilter topicFilter, IDictionary sessionItems) - { - ClientId = clientId; - TopicFilter = topicFilter; - SessionItems = sessionItems; - } - /// /// Gets the client identifier. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. /// - public string ClientId { get; } + public string ClientId { get; internal set; } /// /// Gets or sets the topic filter. @@ -26,7 +19,7 @@ namespace MQTTnet.Server /// /// Gets or sets a key/value collection that can be used to share data within the scope of this session. /// - public IDictionary SessionItems { get; } + public IDictionary SessionItems { get; internal set; } public bool AcceptSubscription { get; set; } = true; diff --git a/Source/MQTTnet/Server/MqttUnsubscriptionInterceptorContext.cs b/Source/MQTTnet/Server/MqttUnsubscriptionInterceptorContext.cs index e5f8d75..99b1f1f 100644 --- a/Source/MQTTnet/Server/MqttUnsubscriptionInterceptorContext.cs +++ b/Source/MQTTnet/Server/MqttUnsubscriptionInterceptorContext.cs @@ -2,32 +2,25 @@ namespace MQTTnet.Server { - public class MqttUnsubscriptionInterceptorContext + public sealed class MqttUnsubscriptionInterceptorContext { - public MqttUnsubscriptionInterceptorContext(string clientId, string topic, IDictionary sessionItems) - { - ClientId = clientId; - Topic = topic; - SessionItems = sessionItems; - } - /// /// Gets the client identifier. /// Hint: This identifier needs to be unique over all used clients / devices on the broker to avoid connection issues. /// - public string ClientId { get; } + public string ClientId { get; internal set; } /// /// Gets or sets the MQTT topic. /// In MQTT, the word topic refers to an UTF-8 string that the broker uses to filter messages for each connected client. /// The topic consists of one or more topic levels. Each topic level is separated by a forward slash (topic level separator). /// - public string Topic { get; set; } + public string Topic { get; internal set; } /// /// Gets or sets a key/value collection that can be used to share data within the scope of this session. /// - public IDictionary SessionItems { get; } + public IDictionary SessionItems { get; internal set; } public bool AcceptUnsubscription { get; set; } = true; diff --git a/Source/MQTTnet/Server/PrepareClientSessionResult.cs b/Source/MQTTnet/Server/PrepareClientSessionResult.cs deleted file mode 100644 index 7509037..0000000 --- a/Source/MQTTnet/Server/PrepareClientSessionResult.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace MQTTnet.Server -{ - public class PrepareClientSessionResult - { - public bool IsExistingSession { get; set; } - - public MqttClientConnection Session { get; set; } - } -} diff --git a/Source/MQTTnet/Server/Status/IMqttClientStatus.cs b/Source/MQTTnet/Server/Status/IMqttClientStatus.cs index c080b51..29fb2d9 100644 --- a/Source/MQTTnet/Server/Status/IMqttClientStatus.cs +++ b/Source/MQTTnet/Server/Status/IMqttClientStatus.cs @@ -16,8 +16,12 @@ namespace MQTTnet.Server.Status MqttProtocolVersion ProtocolVersion { get; } + DateTime ConnectedTimestamp { get; set; } + DateTime LastPacketReceivedTimestamp { get; } + DateTime LastPacketSentTimestamp { get; set; } + DateTime LastNonKeepAlivePacketReceivedTimestamp { get; } long ReceivedApplicationMessagesCount { get; } diff --git a/Source/MQTTnet/Server/Status/IMqttSessionStatus.cs b/Source/MQTTnet/Server/Status/IMqttSessionStatus.cs index 115da13..9f1cd2b 100644 --- a/Source/MQTTnet/Server/Status/IMqttSessionStatus.cs +++ b/Source/MQTTnet/Server/Status/IMqttSessionStatus.cs @@ -11,6 +11,9 @@ namespace MQTTnet.Server.Status /// string ClientId { get; } + /// + /// Gets the count of messages which are not yet sent to the client but already queued. + /// long PendingApplicationMessagesCount { get; } IDictionary Items { get; } diff --git a/Source/MQTTnet/Server/Status/MqttClientStatus.cs b/Source/MQTTnet/Server/Status/MqttClientStatus.cs index 9f2f670..1770cde 100644 --- a/Source/MQTTnet/Server/Status/MqttClientStatus.cs +++ b/Source/MQTTnet/Server/Status/MqttClientStatus.cs @@ -2,6 +2,7 @@ using System; using System.Threading.Tasks; using MQTTnet.Client.Disconnecting; +using MQTTnet.Server.Internal; namespace MQTTnet.Server.Status { @@ -24,9 +25,11 @@ namespace MQTTnet.Server.Status public MqttProtocolVersion ProtocolVersion { get; set; } - public DateTime LastPacketReceivedTimestamp { get; set; } - public DateTime ConnectedTimestamp { get; set; } + + public DateTime LastPacketReceivedTimestamp { get; set; } + + public DateTime LastPacketSentTimestamp { get; set; } public DateTime LastNonKeepAlivePacketReceivedTimestamp { get; set; } @@ -43,7 +46,7 @@ namespace MQTTnet.Server.Status public long BytesSent { get; set; } public long BytesReceived { get; set; } - + public Task DisconnectAsync() { return _connection.StopAsync(MqttClientDisconnectReason.NormalDisconnection); diff --git a/Source/MQTTnet/Server/Status/MqttSessionStatus.cs b/Source/MQTTnet/Server/Status/MqttSessionStatus.cs index af5a6e8..e4bd478 100644 --- a/Source/MQTTnet/Server/Status/MqttSessionStatus.cs +++ b/Source/MQTTnet/Server/Status/MqttSessionStatus.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using MQTTnet.Server.Internal; namespace MQTTnet.Server.Status { @@ -25,8 +26,11 @@ namespace MQTTnet.Server.Status public DateTime CreatedTimestamp { get; set; } + /// + /// This items can be used by the library user in order to store custom information. + /// public IDictionary Items { get; set; } - + public Task DeleteAsync() { return _sessionsManager.DeleteSessionAsync(ClientId); diff --git a/Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs b/Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs index 2df92ad..92365c3 100644 --- a/Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs +++ b/Tests/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs @@ -2,6 +2,7 @@ using BenchmarkDotNet.Jobs; using MQTTnet.Server; using System; +using MQTTnet.Server.Internal; namespace MQTTnet.Benchmarks { diff --git a/Tests/MQTTnet.Core.Tests/BaseTestClass.cs b/Tests/MQTTnet.Core.Tests/BaseTestClass.cs new file mode 100644 index 0000000..f7ab7a6 --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/BaseTestClass.cs @@ -0,0 +1,22 @@ +using System; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Tests.Mockups; + +namespace MQTTnet.Tests +{ + public abstract class BaseTestClass + { + public TestContext TestContext { get; set; } + + protected TestEnvironment CreateTestEnvironment() + { + return new TestEnvironment(TestContext); + } + + protected Task LongDelay() + { + return Task.Delay(TimeSpan.FromSeconds(1)); + } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs b/Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs index ee2ac24..94b42eb 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs @@ -16,7 +16,7 @@ using MQTTnet.Tests.Mockups; namespace MQTTnet.Tests.MQTTv5 { [TestClass] - public class Client_Tests + public sealed class Client_Tests { public TestContext TestContext { get; set; } @@ -60,6 +60,7 @@ namespace MQTTnet.Tests.MQTTv5 Assert.AreEqual(2, receivedMessage.UserProperties.Count); } } + [TestMethod] public async Task Connect_With_AssignedClientId() { @@ -116,7 +117,6 @@ namespace MQTTnet.Tests.MQTTv5 Assert.AreEqual("test123", serverConnectedClientId); Assert.AreEqual("test123", serverDisconnectedClientId); Assert.AreEqual("test123", clientAssignedClientId); - } } @@ -165,7 +165,6 @@ namespace MQTTnet.Tests.MQTTv5 Assert.AreEqual(1, result.Items.Count); Assert.AreEqual(MqttClientSubscribeResultCode.GrantedQoS1, result.Items[0].ResultCode); - } } diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs index f630280..64026b5 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs @@ -172,21 +172,25 @@ namespace MQTTnet.Tests.Mockups return StartServerAsync(new MqttServerOptionsBuilder()); } - public async Task StartServerAsync(MqttServerOptionsBuilder options) + public IMqttServer CreateServer() { - if (options == null) throw new ArgumentNullException(nameof(options)); - if (Server != null) { throw new InvalidOperationException("Server already started."); } Server = new TestServerWrapper(_mqttFactory.CreateMqttServer(ServerLogger), TestContext, this); + return Server; + } + public async Task StartServerAsync(MqttServerOptionsBuilder options) + { + CreateServer(); + options.WithDefaultEndpointPort(ServerPort); options.WithMaxPendingMessagesPerClient(int.MaxValue); - - await Server.StartAsync(options.Build()).ConfigureAwait(false); + + await Server.StartAsync(options.Build()); return Server; } diff --git a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs index d56ef72..33b88c8 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClient_Tests.cs @@ -50,7 +50,7 @@ namespace MQTTnet.Tests Topic = "Test" }, CancellationToken.None); - await Task.Delay(1000); + await Task.Delay(500); // This will simulate a device which closes the connection directly // after sending the data so do delay is added between send and dispose! diff --git a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManager_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManager_Tests.cs index 3bab0aa..3d38ca7 100644 --- a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManager_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManager_Tests.cs @@ -5,6 +5,7 @@ using MQTTnet.Server; using MQTTnet.Tests.Mockups; using System.Collections.Concurrent; using System.Threading.Tasks; +using MQTTnet.Server.Internal; namespace MQTTnet.Tests { diff --git a/Tests/MQTTnet.Core.Tests/Server_Events_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Events_Tests.cs new file mode 100644 index 0000000..c90ef82 --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/Server_Events_Tests.cs @@ -0,0 +1,186 @@ +using System; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Client; +using MQTTnet.Client.Receiving; +using MQTTnet.Formatter; +using MQTTnet.Protocol; +using MQTTnet.Server; + +namespace MQTTnet.Tests +{ + [TestClass] + public sealed class Server_Events_Tests : BaseTestClass + { + [TestMethod] + public async Task Fire_Client_Connected_Event() + { + using (var testEnvironment = CreateTestEnvironment()) + { + var server = await testEnvironment.StartServerAsync(); + + MqttServerClientConnectedEventArgs eventArgs = null; + server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => + { + eventArgs = e; + }); + + await testEnvironment.ConnectClientAsync(o => o.WithCredentials("TheUser")); + + await LongDelay(); + + Assert.IsNotNull(eventArgs); + + Assert.IsTrue(eventArgs.ClientId.StartsWith(nameof(Fire_Client_Connected_Event))); + Assert.IsTrue(eventArgs.Endpoint.Contains("::1")); + Assert.AreEqual(MqttProtocolVersion.V311, eventArgs.ProtocolVersion); + Assert.AreEqual("TheUser", eventArgs.UserName); + } + } + + [TestMethod] + public async Task Fire_Client_Disconnected_Event() + { + using (var testEnvironment = CreateTestEnvironment()) + { + var server = await testEnvironment.StartServerAsync(); + + MqttServerClientDisconnectedEventArgs eventArgs = null; + server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(e => + { + eventArgs = e; + }); + + var client = await testEnvironment.ConnectClientAsync(o => o.WithCredentials("TheUser")); + await client.DisconnectAsync(); + + await LongDelay(); + + Assert.IsNotNull(eventArgs); + + Assert.IsTrue(eventArgs.ClientId.StartsWith(nameof(Fire_Client_Disconnected_Event))); + Assert.IsTrue(eventArgs.Endpoint.Contains("::1")); + Assert.AreEqual(MqttClientDisconnectType.Clean, eventArgs.DisconnectType); + } + } + + [TestMethod] + public async Task Fire_Client_Subscribed_Event() + { + using (var testEnvironment = CreateTestEnvironment()) + { + var server = await testEnvironment.StartServerAsync(); + + MqttServerClientSubscribedTopicEventArgs eventArgs = null; + server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedTopicHandlerDelegate(e => + { + eventArgs = e; + }); + + var client = await testEnvironment.ConnectClientAsync(); + await client.SubscribeAsync("The/Topic", MqttQualityOfServiceLevel.AtLeastOnce); + + await LongDelay(); + + Assert.IsNotNull(eventArgs); + + Assert.IsTrue(eventArgs.ClientId.StartsWith(nameof(Fire_Client_Subscribed_Event))); + Assert.AreEqual("The/Topic", eventArgs.TopicFilter.Topic); + Assert.AreEqual(MqttQualityOfServiceLevel.AtLeastOnce, eventArgs.TopicFilter.QualityOfServiceLevel); + } + } + + [TestMethod] + public async Task Fire_Client_Unsubscribed_Event() + { + using (var testEnvironment = CreateTestEnvironment()) + { + var server = await testEnvironment.StartServerAsync(); + + MqttServerClientUnsubscribedTopicEventArgs eventArgs = null; + server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e => + { + eventArgs = e; + }); + + var client = await testEnvironment.ConnectClientAsync(); + await client.UnsubscribeAsync("The/Topic"); + + await LongDelay(); + + Assert.IsNotNull(eventArgs); + + Assert.IsTrue(eventArgs.ClientId.StartsWith(nameof(Fire_Client_Unsubscribed_Event))); + Assert.AreEqual("The/Topic", eventArgs.TopicFilter); + } + } + + [TestMethod] + public async Task Fire_Application_Message_Received_Event() + { + using (var testEnvironment = CreateTestEnvironment()) + { + var server = await testEnvironment.StartServerAsync(); + + MqttApplicationMessageReceivedEventArgs eventArgs = null; + server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e => + { + eventArgs = e; + }); + + var client = await testEnvironment.ConnectClientAsync(); + await client.PublishAsync("The_Topic", "The_Payload"); + + await LongDelay(); + + Assert.IsNotNull(eventArgs); + + Assert.IsTrue(eventArgs.ClientId.StartsWith(nameof(Fire_Application_Message_Received_Event))); + Assert.AreEqual("The_Topic", eventArgs.ApplicationMessage.Topic); + Assert.AreEqual("The_Payload", eventArgs.ApplicationMessage.ConvertPayloadToString()); + } + } + + [TestMethod] + public async Task Fire_Started_Event() + { + using (var testEnvironment = CreateTestEnvironment()) + { + var server = testEnvironment.CreateServer(); + + EventArgs eventArgs = null; + server.StartedHandler = new MqttServerStartedHandlerDelegate(e => + { + eventArgs = e; + }); + + await server.StartAsync(new MqttServerOptionsBuilder().Build()); + + await LongDelay(); + + Assert.IsNotNull(eventArgs); + } + } + + [TestMethod] + public async Task Fire_Stopped_Event() + { + using (var testEnvironment = CreateTestEnvironment()) + { + var server = await testEnvironment.StartServerAsync(); + + EventArgs eventArgs = null; + server.StoppedHandler = new MqttServerStoppedHandlerDelegate(e => + { + eventArgs = e; + }); + + await server.StopAsync(); + + await LongDelay(); + + Assert.IsNotNull(eventArgs); + } + } + } +} \ No newline at end of file diff --git a/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs index b93e3e4..fee808d 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs @@ -10,10 +10,8 @@ using MQTTnet.Server; namespace MQTTnet.Tests { [TestClass] - public class Server_Status_Tests + public sealed class Server_Status_Tests : BaseTestClass { - public TestContext TestContext { get; set; } - [TestMethod] public async Task Show_Client_And_Session_Statistics() { @@ -143,7 +141,7 @@ namespace MQTTnet.Tests // At most once will send one packet to the client and the server will reply // with an additional ACK packet. await c1.PublishAsync("a", string.Empty, MqttQualityOfServiceLevel.AtLeastOnce); - await Task.Delay(50); + await Task.Delay(250); var clientStatus = await server.GetClientStatusAsync(); diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs index 09f085b..b6a5e71 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs @@ -147,7 +147,7 @@ namespace MQTTnet.Tests } [TestMethod] - public async Task Will_Message_Do_Not_Send() + public async Task Will_Message_Do_Not_Send_On_Clean_Disconnect() { using (var testEnvironment = new TestEnvironment(TestContext)) { @@ -155,7 +155,7 @@ namespace MQTTnet.Tests await testEnvironment.StartServerAsync(); - var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build(); + var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").Build(); var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage); @@ -171,6 +171,38 @@ namespace MQTTnet.Tests Assert.AreEqual(0, receivedMessagesCount); } } + + [TestMethod] + public async Task Will_Message_Do_Not_Send_On_Takeover() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + var receivedMessagesCount = 0; + + await testEnvironment.StartServerAsync(); + + // C1 will receive the last will! + var c1 = await testEnvironment.ConnectClientAsync(); + c1.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(c => Interlocked.Increment(ref receivedMessagesCount)); + await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build()); + + // C2 has the last will defined. + var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").Build(); + + var clientOptions = new MqttClientOptionsBuilder() + .WithWillMessage(willMessage) + .WithClientId("WillOwner"); + + var c2 = await testEnvironment.ConnectClientAsync(clientOptions); + + // C3 will do the connection takeover. + var c3 = await testEnvironment.ConnectClientAsync(clientOptions); + + await Task.Delay(1000); + + Assert.AreEqual(0, receivedMessagesCount); + } + } [TestMethod] public async Task Will_Message_Send() @@ -571,7 +603,12 @@ namespace MQTTnet.Tests using (var testEnvironment = new TestEnvironment(TestContext)) { var server = await testEnvironment.StartServerAsync(); - server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1")); + + server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(async e => + { + // Every client will automatically subscribe to this topic. + await server.SubscribeAsync(e.ClientId, "topic1"); + }); var client = await testEnvironment.ConnectClientAsync(); var receivedMessages = new List(); @@ -636,11 +673,11 @@ namespace MQTTnet.Tests Assert.AreEqual(1, clientConnectedCalled); Assert.AreEqual(0, clientDisconnectedCalled); - await Task.Delay(500); + await Task.Delay(1000); await c1.DisconnectAsync(); - await Task.Delay(500); + await Task.Delay(1000); Assert.AreEqual(1, clientConnectedCalled); Assert.AreEqual(1, clientDisconnectedCalled); diff --git a/Tests/MQTTnet.Core.Tests/TopicFilterComparer_Tests.cs b/Tests/MQTTnet.Core.Tests/TopicFilterComparer_Tests.cs index 5fb3d44..56ebf39 100644 --- a/Tests/MQTTnet.Core.Tests/TopicFilterComparer_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/TopicFilterComparer_Tests.cs @@ -1,5 +1,6 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Server; +using MQTTnet.Server.Internal; namespace MQTTnet.Tests { diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index 7d64d8b..5543cde 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -31,6 +31,7 @@ namespace MQTTnet.TestApp.NetCore Console.WriteLine("a = Start QoS 2 benchmark"); Console.WriteLine("b = Start QoS 1 benchmark"); Console.WriteLine("c = Start QoS 0 benchmark"); + Console.WriteLine("d = Start server with logging"); var pressedKey = Console.ReadKey(true); if (pressedKey.KeyChar == '1') @@ -83,6 +84,10 @@ namespace MQTTnet.TestApp.NetCore { Task.Run(PerformanceTest.RunQoS0Test); } + else if (pressedKey.KeyChar == 'd') + { + Task.Run(ServerTest.RunEmptyServerWithLogging); + } Thread.Sleep(Timeout.Infinite); } diff --git a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs index b0d6534..88969e8 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs @@ -2,8 +2,10 @@ using System.Text; using System.Threading.Tasks; using MQTTnet.Client.Receiving; +using MQTTnet.Diagnostics; using MQTTnet.Protocol; using MQTTnet.Server; +using MQTTnet.Server.Internal; namespace MQTTnet.TestApp.NetCore { @@ -13,6 +15,19 @@ namespace MQTTnet.TestApp.NetCore { var mqttServer = new MqttFactory().CreateMqttServer(); mqttServer.StartAsync(new MqttServerOptions()).GetAwaiter().GetResult(); + + Console.WriteLine("Press any key to exit."); + Console.ReadLine(); + } + + public static void RunEmptyServerWithLogging() + { + var logger = new MqttNetLogger(); + MqttNetConsoleLogger.ForwardToConsole(logger); + + var mqttFactory = new MqttFactory(logger); + var mqttServer = mqttFactory.CreateMqttServer(); + mqttServer.StartAsync(new MqttServerOptions()).GetAwaiter().GetResult(); Console.WriteLine("Press any key to exit."); Console.ReadLine();