From f949d54aef6bbf8b3db99838d7122a6a43783925 Mon Sep 17 00:00:00 2001 From: Federico Di Gregorio Date: Tue, 17 Dec 2019 11:54:59 +0100 Subject: [PATCH 1/3] Added IMqttRetainedMessagesManager * Added interface IMqttRetainedMessagesManager to allow for different retained messages manager implementations. The interface copies almost exactly the current MqttRetainedMessagesManager implementation. * Added IMqttRetainedMessagesManager.Start() to configure the provided IMqttRetainedMessagesManager outside the constructor. This method returns Task because some implementations that use external storage may be slow on initialization (e.g., connect to a database). * Modified MqttRetainedMessagesManager to implement new interface. --- .../Server/IMqttRetainedMessagesManager.cs | 21 +++++++++++++++++++ Source/MQTTnet/Server/IMqttServerOptions.cs | 4 ++-- Source/MQTTnet/Server/MqttClientConnection.cs | 16 +++++++------- Source/MQTTnet/Server/MqttClientSession.cs | 2 +- .../Server/MqttClientSessionsManager.cs | 16 +++++++------- .../Server/MqttRetainedMessagesManager.cs | 11 +++++----- Source/MQTTnet/Server/MqttServer.cs | 13 ++++++------ Source/MQTTnet/Server/MqttServerOptions.cs | 4 +++- .../Server/MqttServerOptionsBuilder.cs | 10 +++++++-- 9 files changed, 64 insertions(+), 33 deletions(-) create mode 100644 Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs diff --git a/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs b/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs new file mode 100644 index 0000000..36ace61 --- /dev/null +++ b/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs @@ -0,0 +1,21 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using MQTTnet.Diagnostics; + +namespace MQTTnet.Server +{ + public interface IMqttRetainedMessagesManager + { + Task Start(IMqttServerOptions options, IMqttNetChildLogger logger); + + Task LoadMessagesAsync(); + + Task ClearMessagesAsync(); + + Task HandleMessageAsync(string clientId, MqttApplicationMessage applicationMessage); + + Task> GetMessagesAsync(); + + Task> GetSubscribedMessagesAsync(ICollection topicFilters); + } +} diff --git a/Source/MQTTnet/Server/IMqttServerOptions.cs b/Source/MQTTnet/Server/IMqttServerOptions.cs index 2145845..7df6f54 100644 --- a/Source/MQTTnet/Server/IMqttServerOptions.cs +++ b/Source/MQTTnet/Server/IMqttServerOptions.cs @@ -22,8 +22,8 @@ namespace MQTTnet.Server MqttServerTcpEndpointOptions DefaultEndpointOptions { get; } MqttServerTlsTcpEndpointOptions TlsEndpointOptions { get; } - IMqttServerStorage Storage { get; } - + IMqttServerStorage Storage { get; } + IMqttRetainedMessagesManager RetainedMessagesManager { get; } } } \ No newline at end of file diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index ed36b6a..c9e2553 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -21,7 +21,7 @@ namespace MQTTnet.Server private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource(); - private readonly MqttRetainedMessagesManager _retainedMessagesManager; + private readonly IMqttRetainedMessagesManager _retainedMessagesManager; private readonly MqttClientKeepAliveMonitor _keepAliveMonitor; private readonly MqttClientSessionsManager _sessionsManager; @@ -36,7 +36,7 @@ namespace MQTTnet.Server private Task _packageReceiverTask; private DateTime _lastPacketReceivedTimestamp; private DateTime _lastNonKeepAlivePacketReceivedTimestamp; - + private long _receivedPacketsCount; private long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere. private long _receivedApplicationMessagesCount; @@ -48,14 +48,14 @@ namespace MQTTnet.Server MqttClientSession session, IMqttServerOptions serverOptions, MqttClientSessionsManager sessionsManager, - MqttRetainedMessagesManager retainedMessagesManager, + IMqttRetainedMessagesManager retainedMessagesManager, IMqttNetChildLogger logger) { Session = session ?? throw new ArgumentNullException(nameof(session)); _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions)); _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); - + _channelAdapter = channelAdapter ?? throw new ArgumentNullException(nameof(channelAdapter)); _dataConverter = _channelAdapter.PacketFormatterAdapter.DataConverter; _endpoint = _channelAdapter.Endpoint; @@ -76,7 +76,7 @@ namespace MQTTnet.Server public string ClientId => ConnectPacket.ClientId; public MqttClientSession Session { get; } - + public async Task StopAsync() { StopInternal(); @@ -112,7 +112,7 @@ namespace MQTTnet.Server status.BytesSent = _channelAdapter.BytesSent; status.BytesReceived = _channelAdapter.BytesReceived; } - + public void Dispose() { _cancellationToken.Dispose(); @@ -130,7 +130,7 @@ namespace MQTTnet.Server try { _logger.Info("Client '{0}': Session started.", ClientId); - + _channelAdapter.ReadingPacketStartedCallback = OnAdapterReadingPacketStarted; _channelAdapter.ReadingPacketCompletedCallback = OnAdapterReadingPacketCompleted; @@ -244,7 +244,7 @@ namespace MQTTnet.Server _channelAdapter.ReadingPacketCompletedCallback = null; _logger.Info("Client '{0}': Session stopped.", ClientId); - + _packageReceiverTask = null; } diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index d165001..d097b9f 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -52,7 +52,7 @@ namespace MQTTnet.Server ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage); } - public async Task SubscribeAsync(ICollection topicFilters, MqttRetainedMessagesManager retainedMessagesManager) + public async Task SubscribeAsync(ICollection topicFilters, IMqttRetainedMessagesManager retainedMessagesManager) { await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false); diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 2da3efc..7e366a6 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -25,13 +25,13 @@ namespace MQTTnet.Server private readonly CancellationToken _cancellationToken; private readonly MqttServerEventDispatcher _eventDispatcher; - private readonly MqttRetainedMessagesManager _retainedMessagesManager; + private readonly IMqttRetainedMessagesManager _retainedMessagesManager; private readonly IMqttServerOptions _options; private readonly IMqttNetChildLogger _logger; public MqttClientSessionsManager( IMqttServerOptions options, - MqttRetainedMessagesManager retainedMessagesManager, + IMqttRetainedMessagesManager retainedMessagesManager, CancellationToken cancellationToken, MqttServerEventDispatcher eventDispatcher, IMqttNetChildLogger logger) @@ -72,7 +72,7 @@ namespace MQTTnet.Server { var clientStatus = new MqttClientStatus(connection); connection.FillStatus(clientStatus); - + var sessionStatus = new MqttSessionStatus(connection.Session, this); connection.Session.FillStatus(sessionStatus); clientStatus.Session = sessionStatus; @@ -91,7 +91,7 @@ namespace MQTTnet.Server { var sessionStatus = new MqttSessionStatus(session, this); session.FillStatus(sessionStatus); - + result.Add(sessionStatus); } @@ -259,7 +259,7 @@ namespace MQTTnet.Server var connection = await CreateConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false); await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false); - + disconnectType = await connection.RunAsync(connectionValidatorContext).ConfigureAwait(false); } catch (OperationCanceledException) @@ -272,7 +272,7 @@ namespace MQTTnet.Server finally { if (clientWasConnected) - { + { if (clientId != null) { _connections.TryRemove(clientId, out _); @@ -333,13 +333,13 @@ namespace MQTTnet.Server { await existingConnection.StopAsync().ConfigureAwait(false); } - + if (isSessionPresent) { if (connectPacket.CleanSession) { session = null; - + _logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId); } else diff --git a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs b/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs index 2e6af16..e79bcef 100644 --- a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs +++ b/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs @@ -7,20 +7,21 @@ using MQTTnet.Internal; namespace MQTTnet.Server { - public class MqttRetainedMessagesManager + public class MqttRetainedMessagesManager : IMqttRetainedMessagesManager { private readonly byte[] _emptyArray = new byte[0]; private readonly AsyncLock _messagesLock = new AsyncLock(); private readonly Dictionary _messages = new Dictionary(); - private readonly IMqttNetChildLogger _logger; - private readonly IMqttServerOptions _options; + private IMqttNetChildLogger _logger; + private IMqttServerOptions _options; - public MqttRetainedMessagesManager(IMqttServerOptions options, IMqttNetChildLogger logger) + public Task Start(IMqttServerOptions options, IMqttNetChildLogger logger) { if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(MqttRetainedMessagesManager)); _options = options ?? throw new ArgumentNullException(nameof(options)); + return Task.CompletedTask; } public async Task LoadMessagesAsync() @@ -128,7 +129,7 @@ namespace MQTTnet.Server break; } } - + return matchingRetainedMessages; } diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index f902fc0..dee2274 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -19,7 +19,7 @@ namespace MQTTnet.Server private readonly IMqttNetChildLogger _logger; private MqttClientSessionsManager _clientSessionsManager; - private MqttRetainedMessagesManager _retainedMessagesManager; + private IMqttRetainedMessagesManager _retainedMessagesManager; private CancellationTokenSource _cancellationTokenSource; public MqttServer(IEnumerable adapters, IMqttNetChildLogger logger) @@ -48,7 +48,7 @@ namespace MQTTnet.Server get => _eventDispatcher.ClientDisconnectedHandler; set => _eventDispatcher.ClientDisconnectedHandler = value; } - + public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler { get => _eventDispatcher.ClientSubscribedTopicHandler; @@ -60,7 +60,7 @@ namespace MQTTnet.Server get => _eventDispatcher.ClientUnsubscribedTopicHandler; set => _eventDispatcher.ClientUnsubscribedTopicHandler = value; } - + public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get => _eventDispatcher.ApplicationMessageReceivedHandler; @@ -121,7 +121,8 @@ namespace MQTTnet.Server _cancellationTokenSource = new CancellationTokenSource(); - _retainedMessagesManager = new MqttRetainedMessagesManager(Options, _logger); + _retainedMessagesManager = Options.RetainedMessagesManager ?? new MqttRetainedMessagesManager(); + await _retainedMessagesManager.Start(Options, _logger); await _retainedMessagesManager.LoadMessagesAsync().ConfigureAwait(false); _clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _cancellationTokenSource.Token, _eventDispatcher, _logger); @@ -150,9 +151,9 @@ namespace MQTTnet.Server { return; } - + await _clientSessionsManager.StopAsync().ConfigureAwait(false); - + _cancellationTokenSource.Cancel(false); foreach (var adapter in _adapters) diff --git a/Source/MQTTnet/Server/MqttServerOptions.cs b/Source/MQTTnet/Server/MqttServerOptions.cs index d5f6737..c6c7c4f 100644 --- a/Source/MQTTnet/Server/MqttServerOptions.cs +++ b/Source/MQTTnet/Server/MqttServerOptions.cs @@ -21,7 +21,7 @@ namespace MQTTnet.Server public IMqttServerConnectionValidator ConnectionValidator { get; set; } public IMqttServerApplicationMessageInterceptor ApplicationMessageInterceptor { get; set; } - + public IMqttServerClientMessageQueueInterceptor ClientMessageQueueInterceptor { get; set; } public IMqttServerSubscriptionInterceptor SubscriptionInterceptor { get; set; } @@ -29,5 +29,7 @@ namespace MQTTnet.Server public IMqttServerUnsubscriptionInterceptor UnsubscriptionInterceptor { get; set; } public IMqttServerStorage Storage { get; set; } + + public IMqttRetainedMessagesManager RetainedMessagesManager { get; set; } } } diff --git a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs index 15126a1..2970fab 100644 --- a/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs +++ b/Source/MQTTnet/Server/MqttServerOptionsBuilder.cs @@ -57,7 +57,7 @@ namespace MQTTnet.Server _options.DefaultEndpointOptions.IsEnabled = false; return this; } - + public MqttServerOptionsBuilder WithEncryptedEndpoint() { _options.TlsEndpointOptions.IsEnabled = true; @@ -118,13 +118,19 @@ namespace MQTTnet.Server return this; } #endif - + public MqttServerOptionsBuilder WithStorage(IMqttServerStorage value) { _options.Storage = value; return this; } + public MqttServerOptionsBuilder WithRetainedMessagesManager(IMqttRetainedMessagesManager value) + { + _options.RetainedMessagesManager = value; + return this; + } + public MqttServerOptionsBuilder WithConnectionValidator(IMqttServerConnectionValidator value) { _options.ConnectionValidator = value; From 55676965d93da80baccdb98fc10a225b0cd3403b Mon Sep 17 00:00:00 2001 From: Federico Di Gregorio Date: Tue, 17 Dec 2019 12:39:57 +0100 Subject: [PATCH 2/3] Use IList in IMqttRetainedMessagesManager return values --- Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs | 2 +- Source/MQTTnet/Server/MqttRetainedMessagesManager.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs b/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs index 36ace61..6ffdd2b 100644 --- a/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs +++ b/Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs @@ -16,6 +16,6 @@ namespace MQTTnet.Server Task> GetMessagesAsync(); - Task> GetSubscribedMessagesAsync(ICollection topicFilters); + Task> GetSubscribedMessagesAsync(ICollection topicFilters); } } diff --git a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs b/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs index e79bcef..c4e2f96 100644 --- a/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs +++ b/Source/MQTTnet/Server/MqttRetainedMessagesManager.cs @@ -104,7 +104,7 @@ namespace MQTTnet.Server } } - public async Task> GetSubscribedMessagesAsync(ICollection topicFilters) + public async Task> GetSubscribedMessagesAsync(ICollection topicFilters) { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); From 3600ebbc624333d3a92d93ace214c37ef14fa4b6 Mon Sep 17 00:00:00 2001 From: Federico Di Gregorio Date: Wed, 18 Dec 2019 10:25:24 +0100 Subject: [PATCH 3/3] Create default MqttRetainedMessagesManager in options --- .../Exceptions/MqttConfigurationException.cs | 21 +++++++++++++++++++ Source/MQTTnet/Server/MqttServer.cs | 5 ++++- Source/MQTTnet/Server/MqttServerOptions.cs | 2 +- 3 files changed, 26 insertions(+), 2 deletions(-) create mode 100644 Source/MQTTnet/Exceptions/MqttConfigurationException.cs diff --git a/Source/MQTTnet/Exceptions/MqttConfigurationException.cs b/Source/MQTTnet/Exceptions/MqttConfigurationException.cs new file mode 100644 index 0000000..4d10faf --- /dev/null +++ b/Source/MQTTnet/Exceptions/MqttConfigurationException.cs @@ -0,0 +1,21 @@ +using System; + +namespace MQTTnet.Exceptions +{ + public class MqttConfigurationException : Exception + { + protected MqttConfigurationException() + { + } + + public MqttConfigurationException(Exception innerException) + : base(innerException.Message, innerException) + { + } + + public MqttConfigurationException(string message) + : base(message) + { + } + } +} diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index dee2274..4c5ab62 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -7,6 +7,7 @@ using MQTTnet.Adapter; using MQTTnet.Client.Publishing; using MQTTnet.Client.Receiving; using MQTTnet.Diagnostics; +using MQTTnet.Exceptions; using MQTTnet.Protocol; using MQTTnet.Server.Status; @@ -117,11 +118,13 @@ namespace MQTTnet.Server { Options = options ?? throw new ArgumentNullException(nameof(options)); + if (Options.RetainedMessagesManager == null) throw new MqttConfigurationException("options.RetainedMessagesManager should not be null."); + if (_cancellationTokenSource != null) throw new InvalidOperationException("The server is already started."); _cancellationTokenSource = new CancellationTokenSource(); - _retainedMessagesManager = Options.RetainedMessagesManager ?? new MqttRetainedMessagesManager(); + _retainedMessagesManager = Options.RetainedMessagesManager; await _retainedMessagesManager.Start(Options, _logger); await _retainedMessagesManager.LoadMessagesAsync().ConfigureAwait(false); diff --git a/Source/MQTTnet/Server/MqttServerOptions.cs b/Source/MQTTnet/Server/MqttServerOptions.cs index c6c7c4f..9773e72 100644 --- a/Source/MQTTnet/Server/MqttServerOptions.cs +++ b/Source/MQTTnet/Server/MqttServerOptions.cs @@ -30,6 +30,6 @@ namespace MQTTnet.Server public IMqttServerStorage Storage { get; set; } - public IMqttRetainedMessagesManager RetainedMessagesManager { get; set; } + public IMqttRetainedMessagesManager RetainedMessagesManager { get; set; } = new MqttRetainedMessagesManager(); } }