Browse Source

Added IMqttRetainedMessagesManager

* Added interface IMqttRetainedMessagesManager to allow for different
retained messages manager implementations. The interface copies almost
exactly the current MqttRetainedMessagesManager implementation.

* Added IMqttRetainedMessagesManager.Start() to configure the provided
IMqttRetainedMessagesManager outside the constructor. This method returns
Task because some implementations that use external storage may be slow on
initialization (e.g., connect to a database).

* Modified MqttRetainedMessagesManager to implement new interface.
release/3.x.x
Federico Di Gregorio 5 years ago
parent
commit
f949d54aef
9 changed files with 64 additions and 33 deletions
  1. +21
    -0
      Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs
  2. +2
    -2
      Source/MQTTnet/Server/IMqttServerOptions.cs
  3. +8
    -8
      Source/MQTTnet/Server/MqttClientConnection.cs
  4. +1
    -1
      Source/MQTTnet/Server/MqttClientSession.cs
  5. +8
    -8
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  6. +6
    -5
      Source/MQTTnet/Server/MqttRetainedMessagesManager.cs
  7. +7
    -6
      Source/MQTTnet/Server/MqttServer.cs
  8. +3
    -1
      Source/MQTTnet/Server/MqttServerOptions.cs
  9. +8
    -2
      Source/MQTTnet/Server/MqttServerOptionsBuilder.cs

+ 21
- 0
Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs View File

@@ -0,0 +1,21 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;

namespace MQTTnet.Server
{
public interface IMqttRetainedMessagesManager
{
Task Start(IMqttServerOptions options, IMqttNetChildLogger logger);

Task LoadMessagesAsync();

Task ClearMessagesAsync();

Task HandleMessageAsync(string clientId, MqttApplicationMessage applicationMessage);

Task<IList<MqttApplicationMessage>> GetMessagesAsync();

Task<List<MqttApplicationMessage>> GetSubscribedMessagesAsync(ICollection<TopicFilter> topicFilters);
}
}

+ 2
- 2
Source/MQTTnet/Server/IMqttServerOptions.cs View File

@@ -22,8 +22,8 @@ namespace MQTTnet.Server
MqttServerTcpEndpointOptions DefaultEndpointOptions { get; } MqttServerTcpEndpointOptions DefaultEndpointOptions { get; }
MqttServerTlsTcpEndpointOptions TlsEndpointOptions { get; } MqttServerTlsTcpEndpointOptions TlsEndpointOptions { get; }


IMqttServerStorage Storage { get; }
IMqttServerStorage Storage { get; }


IMqttRetainedMessagesManager RetainedMessagesManager { get; }
} }
} }

+ 8
- 8
Source/MQTTnet/Server/MqttClientConnection.cs View File

@@ -21,7 +21,7 @@ namespace MQTTnet.Server
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource(); private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();


private readonly MqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttRetainedMessagesManager _retainedMessagesManager;
private readonly MqttClientKeepAliveMonitor _keepAliveMonitor; private readonly MqttClientKeepAliveMonitor _keepAliveMonitor;
private readonly MqttClientSessionsManager _sessionsManager; private readonly MqttClientSessionsManager _sessionsManager;


@@ -36,7 +36,7 @@ namespace MQTTnet.Server
private Task<MqttClientDisconnectType> _packageReceiverTask; private Task<MqttClientDisconnectType> _packageReceiverTask;
private DateTime _lastPacketReceivedTimestamp; private DateTime _lastPacketReceivedTimestamp;
private DateTime _lastNonKeepAlivePacketReceivedTimestamp; private DateTime _lastNonKeepAlivePacketReceivedTimestamp;
private long _receivedPacketsCount; private long _receivedPacketsCount;
private long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere. private long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere.
private long _receivedApplicationMessagesCount; private long _receivedApplicationMessagesCount;
@@ -48,14 +48,14 @@ namespace MQTTnet.Server
MqttClientSession session, MqttClientSession session,
IMqttServerOptions serverOptions, IMqttServerOptions serverOptions,
MqttClientSessionsManager sessionsManager, MqttClientSessionsManager sessionsManager,
MqttRetainedMessagesManager retainedMessagesManager,
IMqttRetainedMessagesManager retainedMessagesManager,
IMqttNetChildLogger logger) IMqttNetChildLogger logger)
{ {
Session = session ?? throw new ArgumentNullException(nameof(session)); Session = session ?? throw new ArgumentNullException(nameof(session));
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions)); _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager)); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
_channelAdapter = channelAdapter ?? throw new ArgumentNullException(nameof(channelAdapter)); _channelAdapter = channelAdapter ?? throw new ArgumentNullException(nameof(channelAdapter));
_dataConverter = _channelAdapter.PacketFormatterAdapter.DataConverter; _dataConverter = _channelAdapter.PacketFormatterAdapter.DataConverter;
_endpoint = _channelAdapter.Endpoint; _endpoint = _channelAdapter.Endpoint;
@@ -76,7 +76,7 @@ namespace MQTTnet.Server
public string ClientId => ConnectPacket.ClientId; public string ClientId => ConnectPacket.ClientId;


public MqttClientSession Session { get; } public MqttClientSession Session { get; }
public async Task StopAsync() public async Task StopAsync()
{ {
StopInternal(); StopInternal();
@@ -112,7 +112,7 @@ namespace MQTTnet.Server
status.BytesSent = _channelAdapter.BytesSent; status.BytesSent = _channelAdapter.BytesSent;
status.BytesReceived = _channelAdapter.BytesReceived; status.BytesReceived = _channelAdapter.BytesReceived;
} }
public void Dispose() public void Dispose()
{ {
_cancellationToken.Dispose(); _cancellationToken.Dispose();
@@ -130,7 +130,7 @@ namespace MQTTnet.Server
try try
{ {
_logger.Info("Client '{0}': Session started.", ClientId); _logger.Info("Client '{0}': Session started.", ClientId);
_channelAdapter.ReadingPacketStartedCallback = OnAdapterReadingPacketStarted; _channelAdapter.ReadingPacketStartedCallback = OnAdapterReadingPacketStarted;
_channelAdapter.ReadingPacketCompletedCallback = OnAdapterReadingPacketCompleted; _channelAdapter.ReadingPacketCompletedCallback = OnAdapterReadingPacketCompleted;


@@ -244,7 +244,7 @@ namespace MQTTnet.Server
_channelAdapter.ReadingPacketCompletedCallback = null; _channelAdapter.ReadingPacketCompletedCallback = null;


_logger.Info("Client '{0}': Session stopped.", ClientId); _logger.Info("Client '{0}': Session stopped.", ClientId);
_packageReceiverTask = null; _packageReceiverTask = null;
} }




+ 1
- 1
Source/MQTTnet/Server/MqttClientSession.cs View File

@@ -52,7 +52,7 @@ namespace MQTTnet.Server
ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage); ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage);
} }


public async Task SubscribeAsync(ICollection<TopicFilter> topicFilters, MqttRetainedMessagesManager retainedMessagesManager)
public async Task SubscribeAsync(ICollection<TopicFilter> topicFilters, IMqttRetainedMessagesManager retainedMessagesManager)
{ {
await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false); await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false);




+ 8
- 8
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -25,13 +25,13 @@ namespace MQTTnet.Server
private readonly CancellationToken _cancellationToken; private readonly CancellationToken _cancellationToken;
private readonly MqttServerEventDispatcher _eventDispatcher; private readonly MqttServerEventDispatcher _eventDispatcher;


private readonly MqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttServerOptions _options; private readonly IMqttServerOptions _options;
private readonly IMqttNetChildLogger _logger; private readonly IMqttNetChildLogger _logger;


public MqttClientSessionsManager( public MqttClientSessionsManager(
IMqttServerOptions options, IMqttServerOptions options,
MqttRetainedMessagesManager retainedMessagesManager,
IMqttRetainedMessagesManager retainedMessagesManager,
CancellationToken cancellationToken, CancellationToken cancellationToken,
MqttServerEventDispatcher eventDispatcher, MqttServerEventDispatcher eventDispatcher,
IMqttNetChildLogger logger) IMqttNetChildLogger logger)
@@ -72,7 +72,7 @@ namespace MQTTnet.Server
{ {
var clientStatus = new MqttClientStatus(connection); var clientStatus = new MqttClientStatus(connection);
connection.FillStatus(clientStatus); connection.FillStatus(clientStatus);
var sessionStatus = new MqttSessionStatus(connection.Session, this); var sessionStatus = new MqttSessionStatus(connection.Session, this);
connection.Session.FillStatus(sessionStatus); connection.Session.FillStatus(sessionStatus);
clientStatus.Session = sessionStatus; clientStatus.Session = sessionStatus;
@@ -91,7 +91,7 @@ namespace MQTTnet.Server
{ {
var sessionStatus = new MqttSessionStatus(session, this); var sessionStatus = new MqttSessionStatus(session, this);
session.FillStatus(sessionStatus); session.FillStatus(sessionStatus);
result.Add(sessionStatus); result.Add(sessionStatus);
} }


@@ -259,7 +259,7 @@ namespace MQTTnet.Server
var connection = await CreateConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false); var connection = await CreateConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false);


await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false); await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false);
disconnectType = await connection.RunAsync(connectionValidatorContext).ConfigureAwait(false); disconnectType = await connection.RunAsync(connectionValidatorContext).ConfigureAwait(false);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
@@ -272,7 +272,7 @@ namespace MQTTnet.Server
finally finally
{ {
if (clientWasConnected) if (clientWasConnected)
{
{
if (clientId != null) if (clientId != null)
{ {
_connections.TryRemove(clientId, out _); _connections.TryRemove(clientId, out _);
@@ -333,13 +333,13 @@ namespace MQTTnet.Server
{ {
await existingConnection.StopAsync().ConfigureAwait(false); await existingConnection.StopAsync().ConfigureAwait(false);
} }
if (isSessionPresent) if (isSessionPresent)
{ {
if (connectPacket.CleanSession) if (connectPacket.CleanSession)
{ {
session = null; session = null;
_logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId); _logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId);
} }
else else


+ 6
- 5
Source/MQTTnet/Server/MqttRetainedMessagesManager.cs View File

@@ -7,20 +7,21 @@ using MQTTnet.Internal;


namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public class MqttRetainedMessagesManager
public class MqttRetainedMessagesManager : IMqttRetainedMessagesManager
{ {
private readonly byte[] _emptyArray = new byte[0]; private readonly byte[] _emptyArray = new byte[0];
private readonly AsyncLock _messagesLock = new AsyncLock(); private readonly AsyncLock _messagesLock = new AsyncLock();
private readonly Dictionary<string, MqttApplicationMessage> _messages = new Dictionary<string, MqttApplicationMessage>(); private readonly Dictionary<string, MqttApplicationMessage> _messages = new Dictionary<string, MqttApplicationMessage>();


private readonly IMqttNetChildLogger _logger;
private readonly IMqttServerOptions _options;
private IMqttNetChildLogger _logger;
private IMqttServerOptions _options;


public MqttRetainedMessagesManager(IMqttServerOptions options, IMqttNetChildLogger logger)
public Task Start(IMqttServerOptions options, IMqttNetChildLogger logger)
{ {
if (logger == null) throw new ArgumentNullException(nameof(logger)); if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttRetainedMessagesManager)); _logger = logger.CreateChildLogger(nameof(MqttRetainedMessagesManager));
_options = options ?? throw new ArgumentNullException(nameof(options)); _options = options ?? throw new ArgumentNullException(nameof(options));
return Task.CompletedTask;
} }


public async Task LoadMessagesAsync() public async Task LoadMessagesAsync()
@@ -128,7 +129,7 @@ namespace MQTTnet.Server
break; break;
} }
} }
return matchingRetainedMessages; return matchingRetainedMessages;
} }




+ 7
- 6
Source/MQTTnet/Server/MqttServer.cs View File

@@ -19,7 +19,7 @@ namespace MQTTnet.Server
private readonly IMqttNetChildLogger _logger; private readonly IMqttNetChildLogger _logger;


private MqttClientSessionsManager _clientSessionsManager; private MqttClientSessionsManager _clientSessionsManager;
private MqttRetainedMessagesManager _retainedMessagesManager;
private IMqttRetainedMessagesManager _retainedMessagesManager;
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;


public MqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetChildLogger logger) public MqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetChildLogger logger)
@@ -48,7 +48,7 @@ namespace MQTTnet.Server
get => _eventDispatcher.ClientDisconnectedHandler; get => _eventDispatcher.ClientDisconnectedHandler;
set => _eventDispatcher.ClientDisconnectedHandler = value; set => _eventDispatcher.ClientDisconnectedHandler = value;
} }
public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler
{ {
get => _eventDispatcher.ClientSubscribedTopicHandler; get => _eventDispatcher.ClientSubscribedTopicHandler;
@@ -60,7 +60,7 @@ namespace MQTTnet.Server
get => _eventDispatcher.ClientUnsubscribedTopicHandler; get => _eventDispatcher.ClientUnsubscribedTopicHandler;
set => _eventDispatcher.ClientUnsubscribedTopicHandler = value; set => _eventDispatcher.ClientUnsubscribedTopicHandler = value;
} }
public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
{ {
get => _eventDispatcher.ApplicationMessageReceivedHandler; get => _eventDispatcher.ApplicationMessageReceivedHandler;
@@ -121,7 +121,8 @@ namespace MQTTnet.Server


_cancellationTokenSource = new CancellationTokenSource(); _cancellationTokenSource = new CancellationTokenSource();


_retainedMessagesManager = new MqttRetainedMessagesManager(Options, _logger);
_retainedMessagesManager = Options.RetainedMessagesManager ?? new MqttRetainedMessagesManager();
await _retainedMessagesManager.Start(Options, _logger);
await _retainedMessagesManager.LoadMessagesAsync().ConfigureAwait(false); await _retainedMessagesManager.LoadMessagesAsync().ConfigureAwait(false);


_clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _cancellationTokenSource.Token, _eventDispatcher, _logger); _clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _cancellationTokenSource.Token, _eventDispatcher, _logger);
@@ -150,9 +151,9 @@ namespace MQTTnet.Server
{ {
return; return;
} }
await _clientSessionsManager.StopAsync().ConfigureAwait(false); await _clientSessionsManager.StopAsync().ConfigureAwait(false);
_cancellationTokenSource.Cancel(false); _cancellationTokenSource.Cancel(false);


foreach (var adapter in _adapters) foreach (var adapter in _adapters)


+ 3
- 1
Source/MQTTnet/Server/MqttServerOptions.cs View File

@@ -21,7 +21,7 @@ namespace MQTTnet.Server
public IMqttServerConnectionValidator ConnectionValidator { get; set; } public IMqttServerConnectionValidator ConnectionValidator { get; set; }


public IMqttServerApplicationMessageInterceptor ApplicationMessageInterceptor { get; set; } public IMqttServerApplicationMessageInterceptor ApplicationMessageInterceptor { get; set; }
public IMqttServerClientMessageQueueInterceptor ClientMessageQueueInterceptor { get; set; } public IMqttServerClientMessageQueueInterceptor ClientMessageQueueInterceptor { get; set; }


public IMqttServerSubscriptionInterceptor SubscriptionInterceptor { get; set; } public IMqttServerSubscriptionInterceptor SubscriptionInterceptor { get; set; }
@@ -29,5 +29,7 @@ namespace MQTTnet.Server
public IMqttServerUnsubscriptionInterceptor UnsubscriptionInterceptor { get; set; } public IMqttServerUnsubscriptionInterceptor UnsubscriptionInterceptor { get; set; }


public IMqttServerStorage Storage { get; set; } public IMqttServerStorage Storage { get; set; }

public IMqttRetainedMessagesManager RetainedMessagesManager { get; set; }
} }
} }

+ 8
- 2
Source/MQTTnet/Server/MqttServerOptionsBuilder.cs View File

@@ -57,7 +57,7 @@ namespace MQTTnet.Server
_options.DefaultEndpointOptions.IsEnabled = false; _options.DefaultEndpointOptions.IsEnabled = false;
return this; return this;
} }
public MqttServerOptionsBuilder WithEncryptedEndpoint() public MqttServerOptionsBuilder WithEncryptedEndpoint()
{ {
_options.TlsEndpointOptions.IsEnabled = true; _options.TlsEndpointOptions.IsEnabled = true;
@@ -118,13 +118,19 @@ namespace MQTTnet.Server
return this; return this;
} }
#endif #endif
public MqttServerOptionsBuilder WithStorage(IMqttServerStorage value) public MqttServerOptionsBuilder WithStorage(IMqttServerStorage value)
{ {
_options.Storage = value; _options.Storage = value;
return this; return this;
} }


public MqttServerOptionsBuilder WithRetainedMessagesManager(IMqttRetainedMessagesManager value)
{
_options.RetainedMessagesManager = value;
return this;
}

public MqttServerOptionsBuilder WithConnectionValidator(IMqttServerConnectionValidator value) public MqttServerOptionsBuilder WithConnectionValidator(IMqttServerConnectionValidator value)
{ {
_options.ConnectionValidator = value; _options.ConnectionValidator = value;


Loading…
Cancel
Save