Преглед на файлове

Merge pull request #826 from fogzot/develop-iretainedmesagesmanager

Added IMqttRetainedMessagesManager
release/3.x.x
Christian преди 5 години
committed by GitHub
родител
ревизия
35e729bd7b
No known key found for this signature in database GPG ключ ID: 4AEE18F83AFDEB23
променени са 10 файла, в които са добавени 89 реда и са изтрити 34 реда
  1. +21
    -0
      Source/MQTTnet/Exceptions/MqttConfigurationException.cs
  2. +21
    -0
      Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs
  3. +2
    -2
      Source/MQTTnet/Server/IMqttServerOptions.cs
  4. +8
    -8
      Source/MQTTnet/Server/MqttClientConnection.cs
  5. +1
    -1
      Source/MQTTnet/Server/MqttClientSession.cs
  6. +8
    -8
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  7. +7
    -6
      Source/MQTTnet/Server/MqttRetainedMessagesManager.cs
  8. +10
    -6
      Source/MQTTnet/Server/MqttServer.cs
  9. +3
    -1
      Source/MQTTnet/Server/MqttServerOptions.cs
  10. +8
    -2
      Source/MQTTnet/Server/MqttServerOptionsBuilder.cs

+ 21
- 0
Source/MQTTnet/Exceptions/MqttConfigurationException.cs Целия файл

@@ -0,0 +1,21 @@
using System;

namespace MQTTnet.Exceptions
{
public class MqttConfigurationException : Exception
{
protected MqttConfigurationException()
{
}

public MqttConfigurationException(Exception innerException)
: base(innerException.Message, innerException)
{
}

public MqttConfigurationException(string message)
: base(message)
{
}
}
}

+ 21
- 0
Source/MQTTnet/Server/IMqttRetainedMessagesManager.cs Целия файл

@@ -0,0 +1,21 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;

namespace MQTTnet.Server
{
public interface IMqttRetainedMessagesManager
{
Task Start(IMqttServerOptions options, IMqttNetChildLogger logger);

Task LoadMessagesAsync();

Task ClearMessagesAsync();

Task HandleMessageAsync(string clientId, MqttApplicationMessage applicationMessage);

Task<IList<MqttApplicationMessage>> GetMessagesAsync();

Task<IList<MqttApplicationMessage>> GetSubscribedMessagesAsync(ICollection<TopicFilter> topicFilters);
}
}

+ 2
- 2
Source/MQTTnet/Server/IMqttServerOptions.cs Целия файл

@@ -22,8 +22,8 @@ namespace MQTTnet.Server
MqttServerTcpEndpointOptions DefaultEndpointOptions { get; }
MqttServerTlsTcpEndpointOptions TlsEndpointOptions { get; }

IMqttServerStorage Storage { get; }
IMqttServerStorage Storage { get; }

IMqttRetainedMessagesManager RetainedMessagesManager { get; }
}
}

+ 8
- 8
Source/MQTTnet/Server/MqttClientConnection.cs Целия файл

@@ -21,7 +21,7 @@ namespace MQTTnet.Server
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly CancellationTokenSource _cancellationToken = new CancellationTokenSource();

private readonly MqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttRetainedMessagesManager _retainedMessagesManager;
private readonly MqttClientKeepAliveMonitor _keepAliveMonitor;
private readonly MqttClientSessionsManager _sessionsManager;

@@ -36,7 +36,7 @@ namespace MQTTnet.Server
private Task<MqttClientDisconnectType> _packageReceiverTask;
private DateTime _lastPacketReceivedTimestamp;
private DateTime _lastNonKeepAlivePacketReceivedTimestamp;
private long _receivedPacketsCount;
private long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere.
private long _receivedApplicationMessagesCount;
@@ -48,14 +48,14 @@ namespace MQTTnet.Server
MqttClientSession session,
IMqttServerOptions serverOptions,
MqttClientSessionsManager sessionsManager,
MqttRetainedMessagesManager retainedMessagesManager,
IMqttRetainedMessagesManager retainedMessagesManager,
IMqttNetChildLogger logger)
{
Session = session ?? throw new ArgumentNullException(nameof(session));
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
_channelAdapter = channelAdapter ?? throw new ArgumentNullException(nameof(channelAdapter));
_dataConverter = _channelAdapter.PacketFormatterAdapter.DataConverter;
_endpoint = _channelAdapter.Endpoint;
@@ -76,7 +76,7 @@ namespace MQTTnet.Server
public string ClientId => ConnectPacket.ClientId;

public MqttClientSession Session { get; }
public async Task StopAsync()
{
StopInternal();
@@ -112,7 +112,7 @@ namespace MQTTnet.Server
status.BytesSent = _channelAdapter.BytesSent;
status.BytesReceived = _channelAdapter.BytesReceived;
}
public void Dispose()
{
_cancellationToken.Dispose();
@@ -130,7 +130,7 @@ namespace MQTTnet.Server
try
{
_logger.Info("Client '{0}': Session started.", ClientId);
_channelAdapter.ReadingPacketStartedCallback = OnAdapterReadingPacketStarted;
_channelAdapter.ReadingPacketCompletedCallback = OnAdapterReadingPacketCompleted;

@@ -244,7 +244,7 @@ namespace MQTTnet.Server
_channelAdapter.ReadingPacketCompletedCallback = null;

_logger.Info("Client '{0}': Session stopped.", ClientId);
_packageReceiverTask = null;
}



+ 1
- 1
Source/MQTTnet/Server/MqttClientSession.cs Целия файл

@@ -52,7 +52,7 @@ namespace MQTTnet.Server
ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage);
}

public async Task SubscribeAsync(ICollection<TopicFilter> topicFilters, MqttRetainedMessagesManager retainedMessagesManager)
public async Task SubscribeAsync(ICollection<TopicFilter> topicFilters, IMqttRetainedMessagesManager retainedMessagesManager)
{
await SubscriptionsManager.SubscribeAsync(topicFilters).ConfigureAwait(false);



+ 8
- 8
Source/MQTTnet/Server/MqttClientSessionsManager.cs Целия файл

@@ -25,13 +25,13 @@ namespace MQTTnet.Server
private readonly CancellationToken _cancellationToken;
private readonly MqttServerEventDispatcher _eventDispatcher;

private readonly MqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttRetainedMessagesManager _retainedMessagesManager;
private readonly IMqttServerOptions _options;
private readonly IMqttNetChildLogger _logger;

public MqttClientSessionsManager(
IMqttServerOptions options,
MqttRetainedMessagesManager retainedMessagesManager,
IMqttRetainedMessagesManager retainedMessagesManager,
CancellationToken cancellationToken,
MqttServerEventDispatcher eventDispatcher,
IMqttNetChildLogger logger)
@@ -72,7 +72,7 @@ namespace MQTTnet.Server
{
var clientStatus = new MqttClientStatus(connection);
connection.FillStatus(clientStatus);
var sessionStatus = new MqttSessionStatus(connection.Session, this);
connection.Session.FillStatus(sessionStatus);
clientStatus.Session = sessionStatus;
@@ -91,7 +91,7 @@ namespace MQTTnet.Server
{
var sessionStatus = new MqttSessionStatus(session, this);
session.FillStatus(sessionStatus);
result.Add(sessionStatus);
}

@@ -259,7 +259,7 @@ namespace MQTTnet.Server
var connection = await CreateConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false);

await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false);
disconnectType = await connection.RunAsync(connectionValidatorContext).ConfigureAwait(false);
}
catch (OperationCanceledException)
@@ -272,7 +272,7 @@ namespace MQTTnet.Server
finally
{
if (clientWasConnected)
{
{
if (clientId != null)
{
_connections.TryRemove(clientId, out _);
@@ -333,13 +333,13 @@ namespace MQTTnet.Server
{
await existingConnection.StopAsync().ConfigureAwait(false);
}
if (isSessionPresent)
{
if (connectPacket.CleanSession)
{
session = null;
_logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId);
}
else


+ 7
- 6
Source/MQTTnet/Server/MqttRetainedMessagesManager.cs Целия файл

@@ -7,20 +7,21 @@ using MQTTnet.Internal;

namespace MQTTnet.Server
{
public class MqttRetainedMessagesManager
public class MqttRetainedMessagesManager : IMqttRetainedMessagesManager
{
private readonly byte[] _emptyArray = new byte[0];
private readonly AsyncLock _messagesLock = new AsyncLock();
private readonly Dictionary<string, MqttApplicationMessage> _messages = new Dictionary<string, MqttApplicationMessage>();

private readonly IMqttNetChildLogger _logger;
private readonly IMqttServerOptions _options;
private IMqttNetChildLogger _logger;
private IMqttServerOptions _options;

public MqttRetainedMessagesManager(IMqttServerOptions options, IMqttNetChildLogger logger)
public Task Start(IMqttServerOptions options, IMqttNetChildLogger logger)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttRetainedMessagesManager));
_options = options ?? throw new ArgumentNullException(nameof(options));
return Task.CompletedTask;
}

public async Task LoadMessagesAsync()
@@ -103,7 +104,7 @@ namespace MQTTnet.Server
}
}

public async Task<List<MqttApplicationMessage>> GetSubscribedMessagesAsync(ICollection<TopicFilter> topicFilters)
public async Task<IList<MqttApplicationMessage>> GetSubscribedMessagesAsync(ICollection<TopicFilter> topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

@@ -128,7 +129,7 @@ namespace MQTTnet.Server
break;
}
}
return matchingRetainedMessages;
}



+ 10
- 6
Source/MQTTnet/Server/MqttServer.cs Целия файл

@@ -7,6 +7,7 @@ using MQTTnet.Adapter;
using MQTTnet.Client.Publishing;
using MQTTnet.Client.Receiving;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Protocol;
using MQTTnet.Server.Status;

@@ -19,7 +20,7 @@ namespace MQTTnet.Server
private readonly IMqttNetChildLogger _logger;

private MqttClientSessionsManager _clientSessionsManager;
private MqttRetainedMessagesManager _retainedMessagesManager;
private IMqttRetainedMessagesManager _retainedMessagesManager;
private CancellationTokenSource _cancellationTokenSource;

public MqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetChildLogger logger)
@@ -48,7 +49,7 @@ namespace MQTTnet.Server
get => _eventDispatcher.ClientDisconnectedHandler;
set => _eventDispatcher.ClientDisconnectedHandler = value;
}
public IMqttServerClientSubscribedTopicHandler ClientSubscribedTopicHandler
{
get => _eventDispatcher.ClientSubscribedTopicHandler;
@@ -60,7 +61,7 @@ namespace MQTTnet.Server
get => _eventDispatcher.ClientUnsubscribedTopicHandler;
set => _eventDispatcher.ClientUnsubscribedTopicHandler = value;
}
public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
{
get => _eventDispatcher.ApplicationMessageReceivedHandler;
@@ -117,11 +118,14 @@ namespace MQTTnet.Server
{
Options = options ?? throw new ArgumentNullException(nameof(options));

if (Options.RetainedMessagesManager == null) throw new MqttConfigurationException("options.RetainedMessagesManager should not be null.");

if (_cancellationTokenSource != null) throw new InvalidOperationException("The server is already started.");

_cancellationTokenSource = new CancellationTokenSource();

_retainedMessagesManager = new MqttRetainedMessagesManager(Options, _logger);
_retainedMessagesManager = Options.RetainedMessagesManager;
await _retainedMessagesManager.Start(Options, _logger);
await _retainedMessagesManager.LoadMessagesAsync().ConfigureAwait(false);

_clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _cancellationTokenSource.Token, _eventDispatcher, _logger);
@@ -150,9 +154,9 @@ namespace MQTTnet.Server
{
return;
}
await _clientSessionsManager.StopAsync().ConfigureAwait(false);
_cancellationTokenSource.Cancel(false);

foreach (var adapter in _adapters)


+ 3
- 1
Source/MQTTnet/Server/MqttServerOptions.cs Целия файл

@@ -21,7 +21,7 @@ namespace MQTTnet.Server
public IMqttServerConnectionValidator ConnectionValidator { get; set; }

public IMqttServerApplicationMessageInterceptor ApplicationMessageInterceptor { get; set; }
public IMqttServerClientMessageQueueInterceptor ClientMessageQueueInterceptor { get; set; }

public IMqttServerSubscriptionInterceptor SubscriptionInterceptor { get; set; }
@@ -29,5 +29,7 @@ namespace MQTTnet.Server
public IMqttServerUnsubscriptionInterceptor UnsubscriptionInterceptor { get; set; }

public IMqttServerStorage Storage { get; set; }

public IMqttRetainedMessagesManager RetainedMessagesManager { get; set; } = new MqttRetainedMessagesManager();
}
}

+ 8
- 2
Source/MQTTnet/Server/MqttServerOptionsBuilder.cs Целия файл

@@ -57,7 +57,7 @@ namespace MQTTnet.Server
_options.DefaultEndpointOptions.IsEnabled = false;
return this;
}
public MqttServerOptionsBuilder WithEncryptedEndpoint()
{
_options.TlsEndpointOptions.IsEnabled = true;
@@ -118,13 +118,19 @@ namespace MQTTnet.Server
return this;
}
#endif
public MqttServerOptionsBuilder WithStorage(IMqttServerStorage value)
{
_options.Storage = value;
return this;
}

public MqttServerOptionsBuilder WithRetainedMessagesManager(IMqttRetainedMessagesManager value)
{
_options.RetainedMessagesManager = value;
return this;
}

public MqttServerOptionsBuilder WithConnectionValidator(IMqttServerConnectionValidator value)
{
_options.ConnectionValidator = value;


Зареждане…
Отказ
Запис