From b0ad6bda706fb66c589c2689d75ee9243d5f0fb5 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Thu, 6 Jun 2019 21:12:32 +0200 Subject: [PATCH] Add support for reset of connection statistics. Refactor keep alive monitor. --- .../MqttConnectionContext.cs | 6 ++++ Source/MQTTnet/Adapter/IMqttChannelAdapter.cs | 2 ++ Source/MQTTnet/Server/MqttClientConnection.cs | 9 +++-- .../Server/MqttClientKeepAliveMonitor.cs | 19 ++++++----- .../Server/Status/IMqttClientStatus.cs | 4 ++- .../MQTTnet/Server/Status/MqttClientStatus.cs | 5 +++ .../Mockups/TestMqttCommunicationAdapter.cs | 4 +++ .../MqttKeepAliveMonitor_Tests.cs | 34 +++++++++++++------ 8 files changed, 60 insertions(+), 23 deletions(-) diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs index fb5f4f1..f50d85d 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs @@ -145,6 +145,12 @@ namespace MQTTnet.AspNetCore return null; } + public void ResetStatistics() + { + BytesReceived = 0; + BytesSent = 0; + } + public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken) { var formatter = PacketFormatterAdapter; diff --git a/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs b/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs index 118761f..a41e8c1 100644 --- a/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs @@ -32,5 +32,7 @@ namespace MQTTnet.Adapter Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken); Task ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken); + + void ResetStatistics(); } } diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index 6cc4bc5..5ede2c6 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -14,7 +14,7 @@ using MQTTnet.Server.Status; namespace MQTTnet.Server { - public class MqttClientConnection : IMqttClientSession, IDisposable + public class MqttClientConnection : IDisposable { private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); @@ -64,7 +64,7 @@ namespace MQTTnet.Server if (logger == null) throw new ArgumentNullException(nameof(logger)); _logger = logger.CreateChildLogger(nameof(MqttClientConnection)); - _keepAliveMonitor = new MqttClientKeepAliveMonitor(this, _logger); + _keepAliveMonitor = new MqttClientKeepAliveMonitor(_connectPacket.ClientId, StopAsync, _logger); _connectedTimestamp = DateTime.UtcNow; _lastPacketReceivedTimestamp = _connectedTimestamp; @@ -86,6 +86,11 @@ namespace MQTTnet.Server } } + public void ResetStatistics() + { + _channelAdapter.ResetStatistics(); + } + public void FillStatus(MqttClientStatus status) { status.ClientId = ClientId; diff --git a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs index 0ca41ab..32effed 100644 --- a/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs +++ b/Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs @@ -10,17 +10,18 @@ namespace MQTTnet.Server { private readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch(); - private readonly IMqttClientSession _clientSession; + private readonly string _clientId; + private readonly Func _keepAliveElapsedCallback; private readonly IMqttNetChildLogger _logger; private bool _isPaused; - public MqttClientKeepAliveMonitor(IMqttClientSession clientSession, IMqttNetChildLogger logger) + public MqttClientKeepAliveMonitor(string clientId, Func keepAliveElapsedCallback, IMqttNetChildLogger logger) { + _clientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); + _keepAliveElapsedCallback = keepAliveElapsedCallback ?? throw new ArgumentNullException(nameof(keepAliveElapsedCallback)); + if (logger == null) throw new ArgumentNullException(nameof(logger)); - - _clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession)); - _logger = logger.CreateChildLogger(nameof(MqttClientKeepAliveMonitor)); } @@ -62,8 +63,8 @@ namespace MQTTnet.Server // If the client sends 1 sec. the server will allow up to 1.5 seconds. if (!_isPaused && _lastPacketReceivedTracker.Elapsed.TotalSeconds >= keepAlivePeriod * 1.5D) { - _logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientSession.ClientId); - await _clientSession.StopAsync().ConfigureAwait(false); + _logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientId); + await _keepAliveElapsedCallback().ConfigureAwait(false); return; } @@ -80,11 +81,11 @@ namespace MQTTnet.Server } catch (Exception exception) { - _logger.Error(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientSession.ClientId); + _logger.Error(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientId); } finally { - _logger.Verbose("Client '{0}': Stopped checking keep alive timeout.", _clientSession.ClientId); + _logger.Verbose("Client '{0}': Stopped checking keep alive timeout.", _clientId); } } } diff --git a/Source/MQTTnet/Server/Status/IMqttClientStatus.cs b/Source/MQTTnet/Server/Status/IMqttClientStatus.cs index 714b00d..bbf4799 100644 --- a/Source/MQTTnet/Server/Status/IMqttClientStatus.cs +++ b/Source/MQTTnet/Server/Status/IMqttClientStatus.cs @@ -29,7 +29,9 @@ namespace MQTTnet.Server.Status long BytesSent { get; } long BytesReceived { get; } - + Task DisconnectAsync(); + + void ResetStatistics(); } } diff --git a/Source/MQTTnet/Server/Status/MqttClientStatus.cs b/Source/MQTTnet/Server/Status/MqttClientStatus.cs index 05cd202..553795f 100644 --- a/Source/MQTTnet/Server/Status/MqttClientStatus.cs +++ b/Source/MQTTnet/Server/Status/MqttClientStatus.cs @@ -43,5 +43,10 @@ namespace MQTTnet.Server.Status { return _connection.StopAsync(); } + + public void ResetStatistics() + { + _connection.ResetStatistics(); + } } } diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs index 9aa99e6..07fff2a 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs @@ -69,6 +69,10 @@ namespace MQTTnet.Tests.Mockups }, cancellationToken); } + public void ResetStatistics() + { + } + private void EnqueuePacketInternal(MqttBasePacket packet) { if (packet == null) throw new ArgumentNullException(nameof(packet)); diff --git a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs index ea2ab0f..adf9a2b 100644 --- a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs @@ -16,31 +16,43 @@ namespace MQTTnet.Tests [TestMethod] public void KeepAlive_Timeout() { - var clientSession = new TestClientSession(); - var monitor = new MqttClientKeepAliveMonitor(clientSession, new MqttNetLogger().CreateChildLogger()); + var counter = 0; - Assert.AreEqual(0, clientSession.StopCalledCount); + var monitor = new MqttClientKeepAliveMonitor("", () => + { + counter++; + return Task.CompletedTask; + }, + new MqttNetLogger().CreateChildLogger()); + + Assert.AreEqual(0, counter); monitor.Start(1, CancellationToken.None); - Assert.AreEqual(0, clientSession.StopCalledCount); + Assert.AreEqual(0, counter); Thread.Sleep(2000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification. - Assert.AreEqual(1, clientSession.StopCalledCount); + Assert.AreEqual(1, counter); } [TestMethod] public void KeepAlive_NoTimeout() { - var clientSession = new TestClientSession(); - var monitor = new MqttClientKeepAliveMonitor(clientSession, new MqttNetLogger().CreateChildLogger()); + var counter = 0; + + var monitor = new MqttClientKeepAliveMonitor("", () => + { + counter++; + return Task.CompletedTask; + }, + new MqttNetLogger().CreateChildLogger()); - Assert.AreEqual(0, clientSession.StopCalledCount); + Assert.AreEqual(0, counter); monitor.Start(1, CancellationToken.None); - Assert.AreEqual(0, clientSession.StopCalledCount); + Assert.AreEqual(0, counter); // Simulate traffic. Thread.Sleep(1000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification. @@ -49,11 +61,11 @@ namespace MQTTnet.Tests monitor.PacketReceived(); Thread.Sleep(1000); - Assert.AreEqual(0, clientSession.StopCalledCount); + Assert.AreEqual(0, counter); Thread.Sleep(2000); - Assert.AreEqual(1, clientSession.StopCalledCount); + Assert.AreEqual(1, counter); } private class TestClientSession : IMqttClientSession