From 6b9015a9287a85182958339ae02cd18582f0be54 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 25 Jul 2018 20:18:27 +0200 Subject: [PATCH] Fix order of ClientConnected and ClientDisconnected events. --- Source/MQTTnet/Server/IMqttClientSession.cs | 2 +- Source/MQTTnet/Server/MqttClientSession.cs | 6 +-- .../Server/MqttClientSessionsManager.cs | 8 ++-- Tests/MQTTnet.Core.Tests/MqttClientTests.cs | 7 ++- .../MqttKeepAliveMonitorTests.cs | 4 +- Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 48 +++++++++++++++++++ 6 files changed, 62 insertions(+), 13 deletions(-) diff --git a/Source/MQTTnet/Server/IMqttClientSession.cs b/Source/MQTTnet/Server/IMqttClientSession.cs index a94ad18..9fdb0eb 100644 --- a/Source/MQTTnet/Server/IMqttClientSession.cs +++ b/Source/MQTTnet/Server/IMqttClientSession.cs @@ -14,7 +14,7 @@ namespace MQTTnet.Server void EnqueueApplicationMessage(MqttClientSession senderClientSession, MqttPublishPacket publishPacket); void ClearPendingApplicationMessages(); - Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter); + Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter); void Stop(MqttClientDisconnectType disconnectType); Task SubscribeAsync(IList topicFilters); diff --git a/Source/MQTTnet/Server/MqttClientSession.cs b/Source/MQTTnet/Server/MqttClientSession.cs index 7879ea6..fa90482 100644 --- a/Source/MQTTnet/Server/MqttClientSession.cs +++ b/Source/MQTTnet/Server/MqttClientSession.cs @@ -65,7 +65,7 @@ namespace MQTTnet.Server status.LastNonKeepAlivePacketReceived = _keepAliveMonitor.LastNonKeepAlivePacketReceived; } - public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) + public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) { if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); if (adapter == null) throw new ArgumentNullException(nameof(adapter)); @@ -129,8 +129,6 @@ namespace MQTTnet.Server _cancellationTokenSource?.Dispose(); _cancellationTokenSource = null; } - - return _wasCleanDisconnect; } public void Stop(MqttClientDisconnectType type) @@ -157,6 +155,8 @@ namespace MQTTnet.Server finally { _logger.Info("Client '{0}': Session stopped.", ClientId); + + _sessionsManager.Server.OnClientDisconnected(ClientId, _wasCleanDisconnect); } } diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 48732a6..c139151 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -125,6 +125,7 @@ namespace MQTTnet.Server { _sessions.Remove(clientId); } + _logger.Verbose("Session for client '{0}' deleted.", clientId); } @@ -207,8 +208,7 @@ namespace MQTTnet.Server private async Task RunSession(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) { var clientId = string.Empty; - var wasCleanDisconnect = false; - + try { var firstPacket = await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false); @@ -253,7 +253,7 @@ namespace MQTTnet.Server Server.OnClientConnected(clientId); - wasCleanDisconnect = await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); + await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -278,8 +278,6 @@ namespace MQTTnet.Server { DeleteSession(clientId); } - - Server.OnClientDisconnected(clientId, wasCleanDisconnect); } } diff --git a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs index 1334a80..f629f8f 100644 --- a/Tests/MQTTnet.Core.Tests/MqttClientTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttClientTests.cs @@ -1,15 +1,18 @@ -using System.Net.Sockets; +using System; +using System.Net.Sockets; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; +using MQTTnet.Diagnostics; using MQTTnet.Exceptions; +using MQTTnet.Implementations; +using MQTTnet.Server; namespace MQTTnet.Core.Tests { [TestClass] public class MqttClientTests { - [TestMethod] public async Task ClientDisconnectException() { diff --git a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs index 267b7bc..b18904a 100644 --- a/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttKeepAliveMonitorTests.cs @@ -60,7 +60,7 @@ namespace MQTTnet.Core.Tests { public string ClientId { get; } - public int StopCalledCount { get; set; } + public int StopCalledCount { get; private set; } public void FillStatus(MqttClientSessionStatus status) { @@ -77,7 +77,7 @@ namespace MQTTnet.Core.Tests throw new NotSupportedException(); } - public Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) + public Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) { throw new NotSupportedException(); } diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 95bb67d..58b27ba 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -484,6 +484,54 @@ namespace MQTTnet.Core.Tests Assert.IsTrue(bodyIsMatching); } + [TestMethod] + public async Task MqttServer_SameClientIdConnectDisconnectEventOrder() + { + var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger()); + var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); + + var connectedClient = false; + var connecteCalledBeforeConnectedClients = false; + + s.ClientConnected += (_, __) => + { + connecteCalledBeforeConnectedClients |= connectedClient; + connectedClient = true; + }; + + s.ClientDisconnected += (_, __) => + { + connectedClient = false; + }; + + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer("localhost") + .WithClientId(Guid.NewGuid().ToString()) + .Build(); + + await s.StartAsync(new MqttServerOptions()); + + var c1 = new MqttFactory().CreateMqttClient(); + var c2 = new MqttFactory().CreateMqttClient(); + + await c1.ConnectAsync(clientOptions); + + await Task.Delay(100); + + await c2.ConnectAsync(clientOptions); + + await Task.Delay(100); + + await c1.DisconnectAsync(); + await c2.DisconnectAsync(); + + await s.StopAsync(); + + await Task.Delay(100); + + Assert.IsFalse(connecteCalledBeforeConnectedClients, "ClientConnected was called before ClientDisconnect was called"); + } + private class TestStorage : IMqttServerStorage { public IList Messages = new List();