Browse Source

Add support for reset of connection statistics. Refactor keep alive monitor.

release/3.x.x
Christian Kratky 5 years ago
parent
commit
b0ad6bda70
8 changed files with 60 additions and 23 deletions
  1. +6
    -0
      Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
  2. +2
    -0
      Source/MQTTnet/Adapter/IMqttChannelAdapter.cs
  3. +7
    -2
      Source/MQTTnet/Server/MqttClientConnection.cs
  4. +10
    -9
      Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs
  5. +3
    -1
      Source/MQTTnet/Server/Status/IMqttClientStatus.cs
  6. +5
    -0
      Source/MQTTnet/Server/Status/MqttClientStatus.cs
  7. +4
    -0
      Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs
  8. +23
    -11
      Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs

+ 6
- 0
Source/MQTTnet.AspnetCore/MqttConnectionContext.cs View File

@@ -145,6 +145,12 @@ namespace MQTTnet.AspNetCore
return null; return null;
} }


public void ResetStatistics()
{
BytesReceived = 0;
BytesSent = 0;
}

public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken) public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken)
{ {
var formatter = PacketFormatterAdapter; var formatter = PacketFormatterAdapter;


+ 2
- 0
Source/MQTTnet/Adapter/IMqttChannelAdapter.cs View File

@@ -32,5 +32,7 @@ namespace MQTTnet.Adapter
Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken); Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken);


Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken); Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken);

void ResetStatistics();
} }
} }

+ 7
- 2
Source/MQTTnet/Server/MqttClientConnection.cs View File

@@ -14,7 +14,7 @@ using MQTTnet.Server.Status;


namespace MQTTnet.Server namespace MQTTnet.Server
{ {
public class MqttClientConnection : IMqttClientSession, IDisposable
public class MqttClientConnection : IDisposable
{ {
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
@@ -64,7 +64,7 @@ namespace MQTTnet.Server
if (logger == null) throw new ArgumentNullException(nameof(logger)); if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttClientConnection)); _logger = logger.CreateChildLogger(nameof(MqttClientConnection));


_keepAliveMonitor = new MqttClientKeepAliveMonitor(this, _logger);
_keepAliveMonitor = new MqttClientKeepAliveMonitor(_connectPacket.ClientId, StopAsync, _logger);


_connectedTimestamp = DateTime.UtcNow; _connectedTimestamp = DateTime.UtcNow;
_lastPacketReceivedTimestamp = _connectedTimestamp; _lastPacketReceivedTimestamp = _connectedTimestamp;
@@ -86,6 +86,11 @@ namespace MQTTnet.Server
} }
} }


public void ResetStatistics()
{
_channelAdapter.ResetStatistics();
}

public void FillStatus(MqttClientStatus status) public void FillStatus(MqttClientStatus status)
{ {
status.ClientId = ClientId; status.ClientId = ClientId;


+ 10
- 9
Source/MQTTnet/Server/MqttClientKeepAliveMonitor.cs View File

@@ -10,17 +10,18 @@ namespace MQTTnet.Server
{ {
private readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch(); private readonly Stopwatch _lastPacketReceivedTracker = new Stopwatch();


private readonly IMqttClientSession _clientSession;
private readonly string _clientId;
private readonly Func<Task> _keepAliveElapsedCallback;
private readonly IMqttNetChildLogger _logger; private readonly IMqttNetChildLogger _logger;


private bool _isPaused; private bool _isPaused;


public MqttClientKeepAliveMonitor(IMqttClientSession clientSession, IMqttNetChildLogger logger)
public MqttClientKeepAliveMonitor(string clientId, Func<Task> keepAliveElapsedCallback, IMqttNetChildLogger logger)
{ {
_clientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
_keepAliveElapsedCallback = keepAliveElapsedCallback ?? throw new ArgumentNullException(nameof(keepAliveElapsedCallback));
if (logger == null) throw new ArgumentNullException(nameof(logger)); if (logger == null) throw new ArgumentNullException(nameof(logger));

_clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession));

_logger = logger.CreateChildLogger(nameof(MqttClientKeepAliveMonitor)); _logger = logger.CreateChildLogger(nameof(MqttClientKeepAliveMonitor));
} }


@@ -62,8 +63,8 @@ namespace MQTTnet.Server
// If the client sends 1 sec. the server will allow up to 1.5 seconds. // If the client sends 1 sec. the server will allow up to 1.5 seconds.
if (!_isPaused && _lastPacketReceivedTracker.Elapsed.TotalSeconds >= keepAlivePeriod * 1.5D) if (!_isPaused && _lastPacketReceivedTracker.Elapsed.TotalSeconds >= keepAlivePeriod * 1.5D)
{ {
_logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientSession.ClientId);
await _clientSession.StopAsync().ConfigureAwait(false);
_logger.Warning(null, "Client '{0}': Did not receive any packet or keep alive signal.", _clientId);
await _keepAliveElapsedCallback().ConfigureAwait(false);


return; return;
} }
@@ -80,11 +81,11 @@ namespace MQTTnet.Server
} }
catch (Exception exception) catch (Exception exception)
{ {
_logger.Error(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientSession.ClientId);
_logger.Error(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", _clientId);
} }
finally finally
{ {
_logger.Verbose("Client '{0}': Stopped checking keep alive timeout.", _clientSession.ClientId);
_logger.Verbose("Client '{0}': Stopped checking keep alive timeout.", _clientId);
} }
} }
} }


+ 3
- 1
Source/MQTTnet/Server/Status/IMqttClientStatus.cs View File

@@ -29,7 +29,9 @@ namespace MQTTnet.Server.Status
long BytesSent { get; } long BytesSent { get; }


long BytesReceived { get; } long BytesReceived { get; }
Task DisconnectAsync(); Task DisconnectAsync();

void ResetStatistics();
} }
} }

+ 5
- 0
Source/MQTTnet/Server/Status/MqttClientStatus.cs View File

@@ -43,5 +43,10 @@ namespace MQTTnet.Server.Status
{ {
return _connection.StopAsync(); return _connection.StopAsync();
} }

public void ResetStatistics()
{
_connection.ResetStatistics();
}
} }
} }

+ 4
- 0
Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs View File

@@ -69,6 +69,10 @@ namespace MQTTnet.Tests.Mockups
}, cancellationToken); }, cancellationToken);
} }


public void ResetStatistics()
{
}

private void EnqueuePacketInternal(MqttBasePacket packet) private void EnqueuePacketInternal(MqttBasePacket packet)
{ {
if (packet == null) throw new ArgumentNullException(nameof(packet)); if (packet == null) throw new ArgumentNullException(nameof(packet));


+ 23
- 11
Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitor_Tests.cs View File

@@ -16,31 +16,43 @@ namespace MQTTnet.Tests
[TestMethod] [TestMethod]
public void KeepAlive_Timeout() public void KeepAlive_Timeout()
{ {
var clientSession = new TestClientSession();
var monitor = new MqttClientKeepAliveMonitor(clientSession, new MqttNetLogger().CreateChildLogger());
var counter = 0;


Assert.AreEqual(0, clientSession.StopCalledCount);
var monitor = new MqttClientKeepAliveMonitor("", () =>
{
counter++;
return Task.CompletedTask;
},
new MqttNetLogger().CreateChildLogger());

Assert.AreEqual(0, counter);


monitor.Start(1, CancellationToken.None); monitor.Start(1, CancellationToken.None);


Assert.AreEqual(0, clientSession.StopCalledCount);
Assert.AreEqual(0, counter);


Thread.Sleep(2000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification. Thread.Sleep(2000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification.


Assert.AreEqual(1, clientSession.StopCalledCount);
Assert.AreEqual(1, counter);
} }


[TestMethod] [TestMethod]
public void KeepAlive_NoTimeout() public void KeepAlive_NoTimeout()
{ {
var clientSession = new TestClientSession();
var monitor = new MqttClientKeepAliveMonitor(clientSession, new MqttNetLogger().CreateChildLogger());
var counter = 0;

var monitor = new MqttClientKeepAliveMonitor("", () =>
{
counter++;
return Task.CompletedTask;
},
new MqttNetLogger().CreateChildLogger());


Assert.AreEqual(0, clientSession.StopCalledCount);
Assert.AreEqual(0, counter);


monitor.Start(1, CancellationToken.None); monitor.Start(1, CancellationToken.None);


Assert.AreEqual(0, clientSession.StopCalledCount);
Assert.AreEqual(0, counter);


// Simulate traffic. // Simulate traffic.
Thread.Sleep(1000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification. Thread.Sleep(1000); // Internally the keep alive timeout is multiplied with 1.5 as per protocol specification.
@@ -49,11 +61,11 @@ namespace MQTTnet.Tests
monitor.PacketReceived(); monitor.PacketReceived();
Thread.Sleep(1000); Thread.Sleep(1000);


Assert.AreEqual(0, clientSession.StopCalledCount);
Assert.AreEqual(0, counter);


Thread.Sleep(2000); Thread.Sleep(2000);


Assert.AreEqual(1, clientSession.StopCalledCount);
Assert.AreEqual(1, counter);
} }


private class TestClientSession : IMqttClientSession private class TestClientSession : IMqttClientSession


Loading…
Cancel
Save