Browse Source

Fix keep alive checks in server

release/3.x.x
Christian Kratky 7 years ago
parent
commit
644e948489
9 changed files with 98 additions and 39 deletions
  1. +1
    -0
      Build/MQTTnet.nuspec
  2. +1
    -1
      Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs
  3. +1
    -6
      Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
  4. +22
    -14
      Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
  5. +64
    -13
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
  6. +4
    -2
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
  7. +0
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs
  8. +3
    -1
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml
  9. +2
    -1
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 1
- 0
Build/MQTTnet.nuspec View File

@@ -22,6 +22,7 @@
* [Server] Added packet statistics for the connected clients.
* [Server] Fixed a security issue which sends retained packages to a failed subscription.
* [Server] Fixed the response (MaximumQoS) of a subscription (Thanks to @redbeans2017).
* [Server] The keep alive timeouts are now checked for every client (Thanks to @RainerMueller82).
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2017</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>


+ 1
- 1
Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs View File

@@ -8,7 +8,7 @@ using MQTTnet.Server;

namespace MQTTnet.AspNetCore
{
public class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable
public sealed class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable
{
public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;



+ 1
- 6
Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs View File

@@ -347,13 +347,8 @@ namespace MQTTnet.Client
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false);
if (cancellationToken.IsCancellationRequested)
{
return;
}

await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()).ConfigureAwait(false);
await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)


+ 22
- 14
Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs View File

@@ -10,9 +10,9 @@ using MQTTnet.Protocol;

namespace MQTTnet.Server
{
public sealed class MqttClientPendingMessagesQueue
public sealed class MqttClientPendingMessagesQueue : IDisposable
{
private readonly BlockingCollection<MqttPublishPacket> _pendingPublishPackets = new BlockingCollection<MqttPublishPacket>();
private readonly BlockingCollection<MqttBasePacket> _queue = new BlockingCollection<MqttBasePacket>();
private readonly IMqttServerOptions _options;
private readonly MqttClientSession _session;
private readonly IMqttNetLogger _logger;
@@ -33,24 +33,24 @@ namespace MQTTnet.Server
return;
}

Task.Run(async () => await SendPendingPublishPacketsAsync(adapter, cancellationToken), cancellationToken).ConfigureAwait(false);
Task.Run(async () => await SendQueuedPacketsAsync(adapter, cancellationToken), cancellationToken).ConfigureAwait(false);
}

public void Enqueue(MqttPublishPacket publishPacket)
public void Enqueue(MqttBasePacket packet)
{
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
if (packet == null) throw new ArgumentNullException(nameof(packet));

_pendingPublishPackets.Add(publishPacket);
_queue.Add(packet);
_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _session.ClientId);
}

private async Task SendPendingPublishPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
await SendPendingPublishPacketAsync(adapter, cancellationToken);
await SendQueuedPacketAsync(adapter, cancellationToken);
}
}
catch (OperationCanceledException)
@@ -62,12 +62,12 @@ namespace MQTTnet.Server
}
}

private async Task SendPendingPublishPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
private async Task SendQueuedPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
{
MqttPublishPacket packet = null;
MqttBasePacket packet = null;
try
{
packet = _pendingPublishPackets.Take(cancellationToken);
packet = _queue.Take(cancellationToken);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false);

_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _session.ClientId);
@@ -90,14 +90,22 @@ namespace MQTTnet.Server
_logger.Error<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed (ClientId: {0}).", _session.ClientId);
}

if (packet != null && packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
if (packet is MqttPublishPacket publishPacket)
{
packet.Dup = true;
_pendingPublishPackets.Add(packet, CancellationToken.None);
if (publishPacket.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
publishPacket.Dup = true;
_queue.Add(packet, CancellationToken.None);
}
}

await _session.StopAsync();
}
}

public void Dispose()
{
_queue?.Dispose();
}
}
}

+ 64
- 13
Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs View File

@@ -12,7 +12,7 @@ using MQTTnet.Serializer;

namespace MQTTnet.Server
{
public sealed class MqttClientSession
public sealed class MqttClientSession : IDisposable
{
private readonly Stopwatch _lastPacketReceivedTracker = Stopwatch.StartNew();
private readonly Stopwatch _lastNonKeepAlivePacketReceivedTracker = Stopwatch.StartNew();
@@ -57,22 +57,29 @@ namespace MQTTnet.Server

public bool IsConnected => _adapter != null;

public async Task RunAsync(MqttApplicationMessage willMessage, IMqttChannelAdapter adapter)
public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
{
if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket));
if (adapter == null) throw new ArgumentNullException(nameof(adapter));

try
{
_lastPacketReceivedTracker.Restart();
_lastNonKeepAlivePacketReceivedTracker.Restart();

var cancellationTokenSource = new CancellationTokenSource();

_willMessage = willMessage;
_willMessage = connectPacket.WillMessage;
_adapter = adapter;
_cancellationTokenSource = cancellationTokenSource;

_pendingMessagesQueue.Start(adapter, cancellationTokenSource.Token);

_lastPacketReceivedTracker.Restart();
_lastNonKeepAlivePacketReceivedTracker.Restart();

if (connectPacket.KeepAlivePeriod > 0)
{
StartCheckingKeepAliveTimeout(TimeSpan.FromSeconds(connectPacket.KeepAlivePeriod), cancellationTokenSource.Token);
}

await ReceivePacketsAsync(adapter, cancellationTokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
@@ -136,6 +143,12 @@ namespace MQTTnet.Server
_pendingMessagesQueue.Enqueue(publishPacket);
}

public void Dispose()
{
_pendingMessagesQueue?.Dispose();
_cancellationTokenSource?.Dispose();
}

private async Task ReceivePacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
{
try
@@ -143,6 +156,14 @@ namespace MQTTnet.Server
while (!cancellationToken.IsCancellationRequested)
{
var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);

_lastPacketReceivedTracker.Restart();

if (!(packet is MqttPingReqPacket))
{
_lastNonKeepAlivePacketReceivedTracker.Restart();
}

await ProcessReceivedPacketAsync(adapter, packet, cancellationToken).ConfigureAwait(false);
}
}
@@ -163,13 +184,6 @@ namespace MQTTnet.Server

private Task ProcessReceivedPacketAsync(IMqttChannelAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken)
{
_lastPacketReceivedTracker.Restart();

if (!(packet is MqttPingReqPacket))
{
_lastNonKeepAlivePacketReceivedTracker.Restart();
}

if (packet is MqttPublishPacket publishPacket)
{
return HandleIncomingPublishPacketAsync(adapter, publishPacket, cancellationToken);
@@ -292,5 +306,42 @@ namespace MQTTnet.Server
var response = new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier };
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response);
}

private void StartCheckingKeepAliveTimeout(TimeSpan keepAlivePeriod, CancellationToken cancellationToken)
{
Task.Run(
async () => await CheckKeepAliveTimeoutAsync(keepAlivePeriod, cancellationToken).ConfigureAwait(false)
, cancellationToken);
}

private async Task CheckKeepAliveTimeoutAsync(TimeSpan keepAlivePeriod, CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
// Values described here: [MQTT-3.1.2-24].
if (_lastPacketReceivedTracker.Elapsed.TotalSeconds > keepAlivePeriod.TotalSeconds * 1.5D)
{
_logger.Warning<MqttClientSession>("Client '{0}': Did not receive any packet or keep alive signal.", ClientId);
await StopAsync();
return;
}

await Task.Delay(keepAlivePeriod, cancellationToken);
}
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.Error<MqttClientSession>(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", ClientId);
}
finally
{
_logger.Trace<MqttClientSession>("Client {0}: Stopped checking keep alive timeout.", ClientId);
}
}
}
}

+ 4
- 2
Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs View File

@@ -70,7 +70,7 @@ namespace MQTTnet.Server
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
});

await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false);
await clientSession.Session.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false);
}
catch (Exception exception)
{
@@ -210,9 +210,11 @@ namespace MQTTnet.Server
if (isSessionPresent)
{
if (connectPacket.CleanSession)
{
{
_sessions.Remove(connectPacket.ClientId);

await clientSession.StopAsync().ConfigureAwait(false);
clientSession.Dispose();
clientSession = null;

_logger.Trace<MqttClientSessionsManager>("Stopped existing session of client '{0}'.", connectPacket.ClientId);


+ 0
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttServer.cs View File

@@ -71,7 +71,6 @@ namespace MQTTnet.Server
}

_logger.Info<MqttServer>("Started.");

Started?.Invoke(this, new MqttServerStartedEventArgs());
}



+ 3
- 1
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml View File

@@ -27,7 +27,9 @@
<TextBox x:Name="ClientId"></TextBox>
<TextBlock>Clean session:</TextBlock>
<CheckBox x:Name="CleanSession" IsChecked="True"></CheckBox>

<TextBlock>Keep alive interval:</TextBlock>
<TextBox x:Name="KeepAliveInterval" Text="5"></TextBox>
<StackPanel Orientation="Horizontal">
<RadioButton x:Name="UseTcp" IsChecked="True" GroupName="connection">TCP</RadioButton>
<RadioButton x:Name="UseWs" GroupName="connection">WS</RadioButton>


+ 2
- 1
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs View File

@@ -103,7 +103,8 @@ namespace MQTTnet.TestApp.UniversalWindows
};

options.CleanSession = CleanSession.IsChecked == true;

options.KeepAlivePeriod = TimeSpan.FromSeconds(double.Parse(KeepAliveInterval.Text));
try
{
if (_mqttClient != null)


Loading…
Cancel
Save