Browse Source

Improve session takeover and keep alive handling.

release/3.x.x
Christian Kratky 4 years ago
parent
commit
24f0ea01f5
8 changed files with 148 additions and 118 deletions
  1. +3
    -0
      Build/MQTTnet.nuspec
  2. +12
    -6
      Source/MQTTnet/Server/MqttClientConnection.cs
  3. +11
    -0
      Source/MQTTnet/Server/MqttClientConnectionStatus.cs
  4. +80
    -95
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  5. +10
    -5
      Source/MQTTnet/Server/MqttServer.cs
  6. +7
    -6
      Source/MQTTnet/Server/MqttServerKeepAliveMonitor.cs
  7. +2
    -1
      Source/MQTTnet/Server/Status/MqttClientStatus.cs
  8. +23
    -5
      Tests/MQTTnet.Core.Tests/Server_KeepAlive_Tests.cs

+ 3
- 0
Build/MQTTnet.nuspec View File

@@ -13,6 +13,9 @@
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol.</description>
<releaseNotes>
* [Core] Added support for TLS 1.3 (requires .NET Core 3.1+) (thanks to @Dvergatal).
* [Server] Reduced async tasks count by moving dedicated keep alive tasks per connection to shared one.
* [Server] Session takeover and keep alive timeout are now properly set in DISCONNECT packet.
* [Server] Performance improvements.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin Blazor</tags>


+ 12
- 6
Source/MQTTnet/Server/MqttClientConnection.cs View File

@@ -41,6 +41,7 @@ namespace MQTTnet.Server
long _sentPacketsCount = 1; // Start with 1 because the CONNECT packet is not counted anywhere.
long _receivedApplicationMessagesCount;
long _sentApplicationMessagesCount;
MqttDisconnectReasonCode _disconnectReason;

public MqttClientConnection(MqttConnectPacket connectPacket,
IMqttChannelAdapter channelAdapter,
@@ -70,9 +71,9 @@ namespace MQTTnet.Server
_lastNonKeepAlivePacketReceivedTimestamp = LastPacketReceivedTimestamp;
}

public MqttConnectPacket ConnectPacket { get; }
public MqttClientConnectionStatus Status { get; private set; } = MqttClientConnectionStatus.Initializing;

public bool IsTakeOver { get; set; }
public MqttConnectPacket ConnectPacket { get; }

public string ClientId => ConnectPacket.ClientId;

@@ -82,9 +83,12 @@ namespace MQTTnet.Server

public MqttClientSession Session { get; }

public async Task StopAsync()
public async Task StopAsync(MqttDisconnectReasonCode reason)
{
if (IsTakeOver)
Status = MqttClientConnectionStatus.Finalizing;
_disconnectReason = reason;

if (reason == MqttDisconnectReasonCode.SessionTakenOver || reason == MqttDisconnectReasonCode.KeepAliveTimeout)
{
// Is is very important to send the DISCONNECT packet here BEFORE cancelling the
// token because the entire connection is closed (disposed) as soon as the cancellation
@@ -94,7 +98,7 @@ namespace MQTTnet.Server
{
await _channelAdapter.SendPacketAsync(new MqttDisconnectPacket
{
ReasonCode = MqttDisconnectReasonCode.SessionTakenOver
ReasonCode = reason
}, _serverOptions.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception exception)
@@ -161,6 +165,8 @@ namespace MQTTnet.Server

while (!cancellationToken.IsCancellationRequested)
{
Status = MqttClientConnectionStatus.Running;

var packet = await _channelAdapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
if (packet == null)
{
@@ -242,7 +248,7 @@ namespace MQTTnet.Server
}
finally
{
if (IsTakeOver)
if (_disconnectReason == MqttDisconnectReasonCode.SessionTakenOver)
{
disconnectType = MqttClientDisconnectType.Takeover;
}


+ 11
- 0
Source/MQTTnet/Server/MqttClientConnectionStatus.cs View File

@@ -0,0 +1,11 @@
namespace MQTTnet.Server
{
public enum MqttClientConnectionStatus
{
Initializing,

Running,

Finalizing
}
}

+ 80
- 95
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -25,7 +25,6 @@ namespace MQTTnet.Server

readonly IDictionary<object, object> _serverSessionItems = new ConcurrentDictionary<object, object>();

readonly CancellationToken _cancellationToken;
readonly MqttServerEventDispatcher _eventDispatcher;

readonly IMqttRetainedMessagesManager _retainedMessagesManager;
@@ -36,12 +35,9 @@ namespace MQTTnet.Server
public MqttClientSessionsManager(
IMqttServerOptions options,
IMqttRetainedMessagesManager retainedMessagesManager,
CancellationToken cancellationToken,
MqttServerEventDispatcher eventDispatcher,
IMqttNetLogger logger)
{
_cancellationToken = cancellationToken;

if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateScopedLogger(nameof(MqttClientSessionsManager));
_rootLogger = logger;
@@ -51,9 +47,55 @@ namespace MQTTnet.Server
_retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
}

public void Start()
public void Start(CancellationToken cancellation)
{
Task.Run(() => TryProcessQueuedApplicationMessagesAsync(_cancellationToken), _cancellationToken).Forget(_logger);
Task.Run(() => TryProcessQueuedApplicationMessagesAsync(cancellation), cancellation).Forget(_logger);
}

public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
{
try
{
MqttConnectPacket connectPacket;
try
{
var firstPacket = await channelAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false);
connectPacket = firstPacket as MqttConnectPacket;
if (connectPacket == null)
{
_logger.Warning(null, "The first packet from client '{0}' was no 'CONNECT' packet [MQTT-3.1.0-1].", channelAdapter.Endpoint);
return;
}
}
catch (MqttCommunicationTimedOutException)
{
_logger.Warning(null, "Client '{0}' connected but did not sent a CONNECT packet.", channelAdapter.Endpoint);
return;
}

var connectionValidatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false);

if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success)
{
// Send failure response here without preparing a session. The result for a successful connect
// will be sent from the session itself.
var connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext);
await channelAdapter.SendPacketAsync(connAckPacket, _options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false);

return;
}

var connection = CreateClientConnection(connectPacket, connectionValidatorContext, channelAdapter);
await _eventDispatcher.SafeNotifyClientConnectedAsync(connectPacket.ClientId).ConfigureAwait(false);
await connection.RunAsync().ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.Error(exception, exception.Message);
}
}

public async Task CloseAllConnectionsAsync()
@@ -67,7 +109,7 @@ namespace MQTTnet.Server

foreach (var connection in connections)
{
await connection.StopAsync().ConfigureAwait(false);
await connection.StopAsync(MqttDisconnectReasonCode.NormalDisconnection).ConfigureAwait(false);
}
}

@@ -78,14 +120,7 @@ namespace MQTTnet.Server
return _connections.Values.ToList();
}
}

public Task HandleClientConnectionAsync(IMqttChannelAdapter clientAdapter)
{
if (clientAdapter is null) throw new ArgumentNullException(nameof(clientAdapter));

return HandleClientConnectionAsync(clientAdapter, _cancellationToken);
}

public Task<IList<IMqttClientStatus>> GetClientStatusAsync()
{
var result = new List<IMqttClientStatus>();
@@ -164,12 +199,39 @@ namespace MQTTnet.Server

if (connection != null)
{
await connection.StopAsync().ConfigureAwait(false);
await connection.StopAsync(MqttDisconnectReasonCode.NormalDisconnection).ConfigureAwait(false);
}
_logger.Verbose("Session for client '{0}' deleted.", clientId);
}

public async Task CleanUpClient(string clientId, IMqttChannelAdapter channelAdapter, MqttClientDisconnectType disconnectType)
{
if (clientId != null)
{
// in case it is a takeover _connections already contains the new connection
if (disconnectType != MqttClientDisconnectType.Takeover)
{
lock (_connections)
{
_connections.Remove(clientId);
}

if (!_options.EnablePersistentSessions)
{
await DeleteSessionAsync(clientId).ConfigureAwait(false);
}
}
}

await SafeCleanupChannelAsync(channelAdapter).ConfigureAwait(false);

if (clientId != null)
{
await _eventDispatcher.SafeNotifyClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false);
}
}

public void Dispose()
{
_messageQueue?.Dispose();
@@ -220,7 +282,7 @@ namespace MQTTnet.Server
{
if (sender != null)
{
await sender.StopAsync().ConfigureAwait(false);
await sender.StopAsync(MqttDisconnectReasonCode.NormalDisconnection).ConfigureAwait(false);
}
}

@@ -278,79 +340,6 @@ namespace MQTTnet.Server
}
}

async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
{
try
{
MqttConnectPacket connectPacket;
try
{
var firstPacket = await channelAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false);
connectPacket = firstPacket as MqttConnectPacket;
if (connectPacket == null)
{
_logger.Warning(null, "The first packet from client '{0}' was no 'CONNECT' packet [MQTT-3.1.0-1].", channelAdapter.Endpoint);
return;
}
}
catch (MqttCommunicationTimedOutException)
{
_logger.Warning(null, "Client '{0}' connected but did not sent a CONNECT packet.", channelAdapter.Endpoint);
return;
}

var connectionValidatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false);

if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success)
{
// Send failure response here without preparing a session. The result for a successful connect
// will be sent from the session itself.
var connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(connectionValidatorContext);
await channelAdapter.SendPacketAsync(connAckPacket, _options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false);

return;
}

var connection = CreateClientConnection(connectPacket, connectionValidatorContext, channelAdapter);
await _eventDispatcher.SafeNotifyClientConnectedAsync(connectPacket.ClientId).ConfigureAwait(false);
await connection.RunAsync().ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.Error(exception, exception.Message);
}
}

public async Task CleanUpClient(string clientId, IMqttChannelAdapter channelAdapter, MqttClientDisconnectType disconnectType)
{
if (clientId != null)
{
// in case it is a takeover _connections already contains the new connection
if (disconnectType != MqttClientDisconnectType.Takeover)
{
lock (_connections)
{
_connections.Remove(clientId);
}

if (!_options.EnablePersistentSessions)
{
await DeleteSessionAsync(clientId).ConfigureAwait(false);
}
}
}

await SafeCleanupChannelAsync(channelAdapter).ConfigureAwait(false);

if (clientId != null)
{
await _eventDispatcher.SafeNotifyClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false);
}
}

async Task<MqttConnectionValidatorContext> ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter)
{
var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter, new ConcurrentDictionary<object, object>());
@@ -417,11 +406,7 @@ namespace MQTTnet.Server
_connections[connectPacket.ClientId] = connection;
}

if (existingConnection != null)
{
existingConnection.IsTakeOver = true;
existingConnection.StopAsync().GetAwaiter().GetResult();
}
existingConnection?.StopAsync(MqttDisconnectReasonCode.SessionTakenOver).GetAwaiter().GetResult();

return connection;
}


+ 10
- 5
Source/MQTTnet/Server/MqttServer.cs View File

@@ -24,6 +24,7 @@ namespace MQTTnet.Server

MqttClientSessionsManager _clientSessionsManager;
IMqttRetainedMessagesManager _retainedMessagesManager;
MqttServerKeepAliveMonitor _keepAliveMonitor;
CancellationTokenSource _cancellationTokenSource;

public MqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger)
@@ -153,18 +154,22 @@ namespace MQTTnet.Server
Options = options ?? throw new ArgumentNullException(nameof(options));
_cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = _cancellationTokenSource.Token;

_retainedMessagesManager = Options.RetainedMessagesManager ?? throw new MqttConfigurationException("options.RetainedMessagesManager should not be null.");

await _retainedMessagesManager.Start(Options, _rootLogger).ConfigureAwait(false);
await _retainedMessagesManager.LoadMessagesAsync().ConfigureAwait(false);

_clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _cancellationTokenSource.Token, _eventDispatcher, _rootLogger);
_clientSessionsManager.Start();
_clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _eventDispatcher, _rootLogger);
_clientSessionsManager.Start(cancellationToken);

_keepAliveMonitor = new MqttServerKeepAliveMonitor(Options, _clientSessionsManager, _rootLogger);
_keepAliveMonitor.Start(cancellationToken);

foreach (var adapter in _adapters)
{
adapter.ClientHandler = OnHandleClient;
adapter.ClientHandler = c => OnHandleClient(c, cancellationToken);
await adapter.StartAsync(Options).ConfigureAwait(false);
}

@@ -231,9 +236,9 @@ namespace MQTTnet.Server
base.Dispose(disposing);
}

Task OnHandleClient(IMqttChannelAdapter channelAdapter)
Task OnHandleClient(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
{
return _clientSessionsManager.HandleClientConnectionAsync(channelAdapter);
return _clientSessionsManager.HandleClientConnectionAsync(channelAdapter, cancellationToken);
}

void ThrowIfStarted()


+ 7
- 6
Source/MQTTnet/Server/MqttServerKeepAliveMonitor.cs View File

@@ -4,6 +4,7 @@ using System;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Implementations;
using MQTTnet.Protocol;

namespace MQTTnet.Server
{
@@ -69,11 +70,11 @@ namespace MQTTnet.Server
{
try
{
//if (connection.IsStopped)
//{
// // The connection is already dead so there is no need to check it.
// return;
//}
if (connection.Status != MqttClientConnectionStatus.Running)
{
// The connection is already dead or just created so there is no need to check it.
return;
}

if (connection.ConnectPacket.KeepAlivePeriod == 0)
{
@@ -104,7 +105,7 @@ namespace MQTTnet.Server

// Execute the disconnection in background so that the keep alive monitor can continue
// with checking other connections.
Task.Run(() => connection.StopAsync());
Task.Run(() => connection.StopAsync(MqttDisconnectReasonCode.KeepAliveTimeout));
}
catch (Exception exception)
{


+ 2
- 1
Source/MQTTnet/Server/Status/MqttClientStatus.cs View File

@@ -1,6 +1,7 @@
using MQTTnet.Formatter;
using System;
using System.Threading.Tasks;
using MQTTnet.Protocol;

namespace MQTTnet.Server.Status
{
@@ -41,7 +42,7 @@ namespace MQTTnet.Server.Status

public Task DisconnectAsync()
{
return _connection.StopAsync();
return _connection.StopAsync(MqttDisconnectReasonCode.NormalDisconnection);
}

public void ResetStatistics()


+ 23
- 5
Tests/MQTTnet.Core.Tests/Server_KeepAlive_Tests.cs View File

@@ -2,7 +2,9 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Formatter;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Tests.Mockups;

namespace MQTTnet.Tests
@@ -17,7 +19,9 @@ namespace MQTTnet.Tests
{
var server = await testEnvironment.StartServerAsync();

var client = await testEnvironment.ConnectLowLevelClientAsync(o => o.WithCommunicationTimeout(TimeSpan.FromSeconds(2))).ConfigureAwait(false);
var client = await testEnvironment.ConnectLowLevelClientAsync(o => o
.WithCommunicationTimeout(TimeSpan.FromSeconds(2))
.WithProtocolVersion(MqttProtocolVersion.V500)).ConfigureAwait(false);

await client.SendAsync(new MqttConnectPacket
{
@@ -32,11 +36,23 @@ namespace MQTTnet.Tests

await client.SendAsync(MqttPingReqPacket.Instance, CancellationToken.None);
await Task.Delay(500);
var responsePacket = await client.ReceiveAsync(CancellationToken.None);
Assert.IsTrue(responsePacket is MqttPingRespPacket);

await client.SendAsync(MqttPingReqPacket.Instance, CancellationToken.None);
await Task.Delay(500);
responsePacket = await client.ReceiveAsync(CancellationToken.None);
Assert.IsTrue(responsePacket is MqttPingRespPacket);

await client.SendAsync(MqttPingReqPacket.Instance, CancellationToken.None);
await Task.Delay(500);
responsePacket = await client.ReceiveAsync(CancellationToken.None);
Assert.IsTrue(responsePacket is MqttPingRespPacket);

await client.SendAsync(MqttPingReqPacket.Instance, CancellationToken.None);
await Task.Delay(500);
responsePacket = await client.ReceiveAsync(CancellationToken.None);
Assert.IsTrue(responsePacket is MqttPingRespPacket);

// If we reach this point everything works as expected (server did not close the connection
// due to proper ping messages.
@@ -44,14 +60,16 @@ namespace MQTTnet.Tests

await Task.Delay(1200);
await client.SendAsync(MqttPingReqPacket.Instance, CancellationToken.None);
responsePacket = await client.ReceiveAsync(CancellationToken.None);
Assert.IsTrue(responsePacket is MqttPingRespPacket);

// Now we will wait longer than 1.5 so that the server will close the connection.
responsePacket = await client.ReceiveAsync(CancellationToken.None);

await Task.Delay(3000);

await server.StopAsync();
var disconnectPacket = responsePacket as MqttDisconnectPacket;

await client.ReceiveAsync(CancellationToken.None);
Assert.IsTrue(disconnectPacket != null);
Assert.AreEqual(disconnectPacket.ReasonCode, MqttDisconnectReasonCode.KeepAliveTimeout);
}
}
}

Loading…
Cancel
Save