Browse Source

Refactor keep alive monitoring and add unit tests.

release/3.x.x
Christian 6 years ago
parent
commit
f85ad4965b
8 changed files with 182 additions and 76 deletions
  1. +1
    -0
      Build/MQTTnet.nuspec
  2. +2
    -0
      Frameworks/MQTTnet.NetStandard/Server/IMqttServer.cs
  3. +88
    -0
      Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs
  4. +14
    -67
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
  5. +2
    -2
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
  6. +8
    -7
      Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs
  7. +1
    -0
      README.md
  8. +66
    -0
      Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs

+ 1
- 0
Build/MQTTnet.nuspec View File

@@ -14,6 +14,7 @@
* [Client] The _ManagedClient_ now supports unsubscribing (thanks to @lerppana)
* [Server] Fixed some minor async issues.
* [Server] Fixed wrong comparison of the topic and QoS for retained messages.
* [Server] Added a property which provides access to the used options (read only).
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>


+ 2
- 0
Frameworks/MQTTnet.NetStandard/Server/IMqttServer.cs View File

@@ -13,6 +13,8 @@ namespace MQTTnet.Server
event EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic;
event EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic;
IMqttServerOptions Options { get; }

Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync();

Task SubscribeAsync(string clientId, IList<TopicFilter> topicFilters);


+ 88
- 0
Frameworks/MQTTnet.NetStandard/Server/MqttClientKeepAliveMonitor.cs View File

@@ -0,0 +1,88 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;
using MQTTnet.Packets;

namespace MQTTnet.Server
{
public sealed class MqttClientKeepAliveMonitor
{
private readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch();
private readonly Stopwatch _lastNonKeepAlivePacketReceivedTracker = new Stopwatch();

private readonly string _clientId;
private readonly Func<Task> _timeoutCallback;
private readonly IMqttNetLogger _logger;

public MqttClientKeepAliveMonitor(string clientId, Func<Task> timeoutCallback, IMqttNetLogger logger)
{
_clientId = clientId;
_timeoutCallback = timeoutCallback;
_logger = logger;
}

public TimeSpan LastPacketReceived => _lastPacketReceivedTracker.Elapsed;

public TimeSpan LastNonKeepAlivePacketReceived => _lastNonKeepAlivePacketReceivedTracker.Elapsed;

public void Start(int keepAlivePeriod, CancellationToken cancellationToken)
{
if (keepAlivePeriod == 0)
{
return;
}

Task.Run(async () => await RunAsync(keepAlivePeriod, cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false);
}

private async Task RunAsync(int keepAlivePeriod, CancellationToken cancellationToken)
{
try
{
_lastPacketReceivedTracker.Restart();
_lastNonKeepAlivePacketReceivedTracker.Restart();

while (!cancellationToken.IsCancellationRequested)
{
// Values described here: [MQTT-3.1.2-24].
if (_lastPacketReceivedTracker.Elapsed.TotalSeconds > keepAlivePeriod * 1.5D)
{
_logger.Warning<MqttClientSession>("Client '{0}': Did not receive any packet or keep alive signal.", _clientId);

if (_timeoutCallback != null)
{
await _timeoutCallback().ConfigureAwait(false);
}

return;
}

await Task.Delay(keepAlivePeriod, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.Error<MqttClientSession>(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientId);
}
finally
{
_logger.Trace<MqttClientSession>("Client {0}: Stopped checking keep alive timeout.", _clientId);
}
}

public void PacketReceived(MqttBasePacket packet)
{
_lastPacketReceivedTracker.Restart();

if (!(packet is MqttPingReqPacket))
{
_lastNonKeepAlivePacketReceivedTracker.Restart();
}
}
}
}

+ 14
- 67
Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs View File

@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
@@ -15,12 +14,8 @@ namespace MQTTnet.Server
{
public sealed class MqttClientSession : IDisposable
{
private readonly Stopwatch _lastPacketReceivedTracker = Stopwatch.StartNew();
private readonly Stopwatch _lastNonKeepAlivePacketReceivedTracker = Stopwatch.StartNew();

private readonly IMqttServerOptions _options;
private readonly IMqttNetLogger _logger;

private readonly MqttRetainedMessagesManager _retainedMessagesManager;

private IMqttChannelAdapter _adapter;
@@ -38,7 +33,8 @@ namespace MQTTnet.Server
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

ClientId = clientId;

KeepAliveMonitor = new MqttClientKeepAliveMonitor(clientId, StopDueToKeepAliveTimeoutAsync, _logger);
SubscriptionsManager = new MqttClientSubscriptionsManager(_options, clientId);
PendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger);
}
@@ -49,14 +45,12 @@ namespace MQTTnet.Server

public MqttClientPendingMessagesQueue PendingMessagesQueue { get; }

public MqttClientKeepAliveMonitor KeepAliveMonitor { get; }

public string ClientId { get; }

public MqttProtocolVersion? ProtocolVersion => _adapter?.PacketSerializer.ProtocolVersion;

public TimeSpan LastPacketReceived => _lastPacketReceivedTracker.Elapsed;

public TimeSpan LastNonKeepAlivePacketReceived => _lastNonKeepAlivePacketReceivedTracker.Elapsed;

public bool IsConnected => _adapter != null;

public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
@@ -73,14 +67,7 @@ namespace MQTTnet.Server
_cancellationTokenSource = cancellationTokenSource;

PendingMessagesQueue.Start(adapter, cancellationTokenSource.Token);

_lastPacketReceivedTracker.Restart();
_lastNonKeepAlivePacketReceivedTracker.Restart();

if (connectPacket.KeepAlivePeriod > 0)
{
StartCheckingKeepAliveTimeout(TimeSpan.FromSeconds(connectPacket.KeepAlivePeriod), cancellationTokenSource.Token);
}
KeepAliveMonitor.Start(connectPacket.KeepAlivePeriod, cancellationTokenSource.Token);

await ReceivePacketsAsync(adapter, cancellationTokenSource.Token).ConfigureAwait(false);
}
@@ -123,7 +110,7 @@ namespace MQTTnet.Server
var willMessage = _willMessage;
if (willMessage != null)
{
_willMessage = null; //clear willmessage so it is send just once
_willMessage = null; // clear willmessage so it is send just once
await ApplicationMessageReceivedCallback(this, willMessage).ConfigureAwait(false);
}
}
@@ -161,12 +148,10 @@ namespace MQTTnet.Server
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

var response = SubscriptionsManager.UnsubscribeAsync(new MqttUnsubscribePacket
return SubscriptionsManager.UnsubscribeAsync(new MqttUnsubscribePacket
{
TopicFilters = topicFilters
});

return response;
}

public void Dispose()
@@ -179,6 +164,12 @@ namespace MQTTnet.Server
_cancellationTokenSource?.Dispose();
}

private Task StopDueToKeepAliveTimeoutAsync()
{
_logger.Info<MqttClientSession>("Client '{0}': Timeout while waiting for KeepAlive packet.", ClientId);
return StopAsync();
}

private async Task ReceivePacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
{
try
@@ -186,14 +177,7 @@ namespace MQTTnet.Server
while (!cancellationToken.IsCancellationRequested)
{
var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);

_lastPacketReceivedTracker.Restart();

if (!(packet is MqttPingReqPacket))
{
_lastNonKeepAlivePacketReceivedTracker.Restart();
}

KeepAliveMonitor.PacketReceived(packet);
await ProcessReceivedPacketAsync(adapter, packet, cancellationToken).ConfigureAwait(false);
}
}
@@ -335,42 +319,5 @@ namespace MQTTnet.Server
var response = new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier };
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response);
}

private void StartCheckingKeepAliveTimeout(TimeSpan keepAlivePeriod, CancellationToken cancellationToken)
{
Task.Run(
async () => await CheckKeepAliveTimeoutAsync(keepAlivePeriod, cancellationToken).ConfigureAwait(false)
, cancellationToken);
}

private async Task CheckKeepAliveTimeoutAsync(TimeSpan keepAlivePeriod, CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
// Values described here: [MQTT-3.1.2-24].
if (_lastPacketReceivedTracker.Elapsed.TotalSeconds > keepAlivePeriod.TotalSeconds * 1.5D)
{
_logger.Warning<MqttClientSession>("Client '{0}': Did not receive any packet or keep alive signal.", ClientId);
await StopAsync();
return;
}

await Task.Delay(keepAlivePeriod, cancellationToken);
}
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.Error<MqttClientSession>(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", ClientId);
}
finally
{
_logger.Trace<MqttClientSession>("Client {0}: Stopped checking keep alive timeout.", ClientId);
}
}
}
}

+ 2
- 2
Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs View File

@@ -129,8 +129,8 @@ namespace MQTTnet.Server
{
ClientId = s.Value.ClientId,
ProtocolVersion = s.Value.ProtocolVersion ?? MqttProtocolVersion.V311,
LastPacketReceived = s.Value.LastPacketReceived,
LastNonKeepAlivePacketReceived = s.Value.LastNonKeepAlivePacketReceived,
LastPacketReceived = s.Value.KeepAliveMonitor.LastPacketReceived,
LastNonKeepAlivePacketReceived = s.Value.KeepAliveMonitor.LastNonKeepAlivePacketReceived,
PendingApplicationMessages = s.Value.PendingMessagesQueue.Count
}).ToList();
}


+ 8
- 7
Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs View File

@@ -16,7 +16,6 @@ namespace MQTTnet.Server
private MqttClientSessionsManager _clientSessionsManager;
private MqttRetainedMessagesManager _retainedMessagesManager;
private CancellationTokenSource _cancellationTokenSource;
private IMqttServerOptions _options;

public MqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger)
{
@@ -35,6 +34,8 @@ namespace MQTTnet.Server

public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

public IMqttServerOptions Options { get; private set; }

public Task<IList<ConnectedMqttClient>> GetConnectedClientsAsync()
{
return _clientSessionsManager.GetConnectedClientsAsync();
@@ -70,16 +71,16 @@ namespace MQTTnet.Server

public async Task StartAsync(IMqttServerOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
Options = options ?? throw new ArgumentNullException(nameof(options));

if (_cancellationTokenSource != null) throw new InvalidOperationException("The server is already started.");

_cancellationTokenSource = new CancellationTokenSource();

_retainedMessagesManager = new MqttRetainedMessagesManager(_options, _logger);
_retainedMessagesManager = new MqttRetainedMessagesManager(Options, _logger);
await _retainedMessagesManager.LoadMessagesAsync();

_clientSessionsManager = new MqttClientSessionsManager(_options, _retainedMessagesManager, _logger)
_clientSessionsManager = new MqttClientSessionsManager(Options, _retainedMessagesManager, _logger)
{
ClientConnectedCallback = OnClientConnected,
ClientDisconnectedCallback = OnClientDisconnected,
@@ -91,7 +92,7 @@ namespace MQTTnet.Server
foreach (var adapter in _adapters)
{
adapter.ClientAccepted += OnClientAccepted;
await adapter.StartAsync(_options);
await adapter.StartAsync(Options);
}

_logger.Info<MqttServer>("Started.");
@@ -142,7 +143,7 @@ namespace MQTTnet.Server
_logger.Info<MqttServer>("Client '{0}': Disconnected.", client.ClientId);
ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client));
}
private void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter)
{
ClientSubscribedTopic?.Invoke(this, new MqttClientSubscribedTopicEventArgs(clientId, topicFilter));
@@ -152,7 +153,7 @@ namespace MQTTnet.Server
{
ClientUnsubscribedTopic?.Invoke(this, new MqttClientUnsubscribedTopicEventArgs(clientId, topicFilter));
}
private void OnApplicationMessageReceived(string clientId, MqttApplicationMessage applicationMessage)
{
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(clientId, applicationMessage));


+ 1
- 0
README.md View File

@@ -43,6 +43,7 @@ MQTTnet is a high performance .NET library for MQTT based communication. It prov
* Retained messages are supported including persisting via interface methods (own implementation required)
* WebSockets supported (via ASP.NET Core 2.0, separate nuget)
* A custom message interceptor can be added which allows transforming or extending every received application message
* Validate subscriptions and deny subscribing of certain topics depending on requesting clients

## Supported frameworks



+ 66
- 0
Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs View File

@@ -0,0 +1,66 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Diagnostics;
using MQTTnet.Packets;
using MQTTnet.Server;

namespace MQTTnet.Core.Tests
{
[TestClass]
public class MqttKeepAliveMonitorTests
{
[TestMethod]
public void KeepAlive_Timeout()
{
var timeoutCalledCount = 0;

var monitor = new MqttClientKeepAliveMonitor(string.Empty, delegate
{
timeoutCalledCount++;
return Task.FromResult(0);
}, new MqttNetLogger());

Assert.AreEqual(0, timeoutCalledCount);

monitor.Start(1, CancellationToken.None);

Assert.AreEqual(0, timeoutCalledCount);

Thread.Sleep(2000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification.

Assert.AreEqual(1, timeoutCalledCount);
}

[TestMethod]
public void KeepAlive_NoTimeout()
{
var timeoutCalledCount = 0;

var monitor = new MqttClientKeepAliveMonitor(string.Empty, delegate
{
timeoutCalledCount++;
return Task.FromResult(0);
}, new MqttNetLogger());

Assert.AreEqual(0, timeoutCalledCount);

monitor.Start(1, CancellationToken.None);

Assert.AreEqual(0, timeoutCalledCount);

// Simulate traffic.
Thread.Sleep(1000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification.
monitor.PacketReceived(new MqttPublishPacket());
Thread.Sleep(1000);
monitor.PacketReceived(new MqttPublishPacket());
Thread.Sleep(1000);

Assert.AreEqual(0, timeoutCalledCount);

Thread.Sleep(2000);

Assert.AreEqual(1, timeoutCalledCount);
}
}
}

Loading…
Cancel
Save