From f7db00c08e49216ca1706ca140d4a359c4b596b8 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sun, 11 Oct 2020 16:41:13 +0200 Subject: [PATCH 1/3] improved behavior when multiple connections fight over a session --- Source/MQTTnet/Client/MqttClient.cs | 8 +++- Source/MQTTnet/Server/MqttClientConnection.cs | 9 ++++ .../Server/MqttClientSessionsManager.cs | 45 ++++++++++--------- Tests/MQTTnet.Core.Tests/Session_Tests.cs | 36 +++++++++++++++ 4 files changed, 76 insertions(+), 22 deletions(-) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 8a045be..51a3612 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -561,12 +561,16 @@ namespace MQTTnet.Client { await SendAsync(new MqttPingRespPacket(), cancellationToken).ConfigureAwait(false); } - else if (packet is MqttDisconnectPacket) + else if (packet is MqttDisconnectPacket disc) { // Also dispatch disconnect to waiting threads to generate a proper exception. _packetDispatcher.Dispatch(packet); - await DisconnectAsync(null, cancellationToken).ConfigureAwait(false); + await DisconnectAsync(new MqttClientDisconnectOptions() + { + // todo conversion + ReasonCode = disc.ReasonCode + }, cancellationToken).ConfigureAwait(false); } else if (packet is MqttAuthPacket authPacket) { diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index bf51269..57fb63e 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -257,6 +257,15 @@ namespace MQTTnet.Server Session.WillMessage = null; } + if (_isTakeover) + { + // dont use SendAsync here _cancellationToken is already cancelled + await _channelAdapter.SendPacketAsync(new MqttDisconnectPacket() + { + ReasonCode = MqttDisconnectReasonCode.SessionTakenOver + }, TimeSpan.Zero, CancellationToken.None).ConfigureAwait(false); + } + _packetDispatcher.Reset(); _channelAdapter.ReadingPacketStartedCallback = null; diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 48f6980..6935bcb 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -316,7 +316,11 @@ namespace MQTTnet.Server { if (clientId != null) { - _connections.TryRemove(clientId, out _); + // in case it is a takeover _connections already contains the new connection + if (disconnectType != MqttClientDisconnectType.Takeover) + { + _connections.TryRemove(clientId, out _); + } if (!_options.EnablePersistentSessions) { @@ -364,38 +368,39 @@ namespace MQTTnet.Server { using (await _createConnectionGate.WaitAsync(_cancellationToken).ConfigureAwait(false)) { - var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var session); - - var isConnectionPresent = _connections.TryGetValue(connectPacket.ClientId, out var existingConnection); - if (isConnectionPresent) + var session = _sessions.AddOrUpdate(connectPacket.ClientId, key => { - await existingConnection.StopAsync(true).ConfigureAwait(false); - } - - if (isSessionPresent) + _logger.Verbose("Created a new session for client '{0}'.", key); + return new MqttClientSession(key, connectionValidatorContext.SessionItems, _eventDispatcher, _options, _retainedMessagesManager, _rootLogger); + }, (key, existingSession) => { if (connectPacket.CleanSession) { - session = null; - _logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId); + return new MqttClientSession(key, connectionValidatorContext.SessionItems, _eventDispatcher, _options, _retainedMessagesManager, _rootLogger); } else { _logger.Verbose("Reusing existing session of client '{0}'.", connectPacket.ClientId); + return existingSession; } - } - - if (session == null) - { - session = new MqttClientSession(connectPacket.ClientId, connectionValidatorContext.SessionItems, _eventDispatcher, _options, _retainedMessagesManager, _rootLogger); - _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); - } + }); var connection = new MqttClientConnection(connectPacket, channelAdapter, session, _options, this, _retainedMessagesManager, onStart, onStop, _rootLogger); + MqttClientConnection existingConnection = null; + _connections.AddOrUpdate(connectPacket.ClientId, key => + { + return connection; + }, (key, tempexistingConnection) => + { + existingConnection = tempexistingConnection; + return connection; + }); - _connections[connection.ClientId] = connection; - _sessions[session.ClientId] = session; + if (existingConnection != null) + { + await existingConnection.StopAsync(true).ConfigureAwait(false); + } return connection; } diff --git a/Tests/MQTTnet.Core.Tests/Session_Tests.cs b/Tests/MQTTnet.Core.Tests/Session_Tests.cs index 973fbf3..1f4c418 100644 --- a/Tests/MQTTnet.Core.Tests/Session_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Session_Tests.cs @@ -1,5 +1,6 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Client; +using MQTTnet.Client.Options; using MQTTnet.Client.Subscribing; using MQTTnet.Server; using MQTTnet.Tests.Mockups; @@ -84,5 +85,40 @@ namespace MQTTnet.Tests Assert.AreEqual(true, session.Items["can_subscribe_x"]); } } + + + [TestMethod] + public async Task Manage_Session_MaxParallel() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + testEnvironment.IgnoreClientLogErrors = true; + var serverOptions = new MqttServerOptionsBuilder(); + await testEnvironment.StartServerAsync(serverOptions); + + var options = new MqttClientOptionsBuilder() + .WithClientId("1") + ; + + var clients = await Task.WhenAll(Enumerable.Range(0, 10) + .Select(i => TryConnect(testEnvironment, options))); + + var connectedClients = clients.Where(c => c?.IsConnected ?? false).ToList(); + + Assert.AreEqual(1, connectedClients.Count); + } + } + + private async Task TryConnect(TestEnvironment testEnvironment, MqttClientOptionsBuilder options) + { + try + { + return await testEnvironment.ConnectClientAsync(options); + } + catch (System.Exception) + { + return null; + } + } } } From d7b552fce23ecc41502639e70bdf17b36f32374c Mon Sep 17 00:00:00 2001 From: JanEggers Date: Tue, 13 Oct 2020 15:57:21 +0200 Subject: [PATCH 2/3] renaming and added cast --- Build/MQTTnet.nuspec | 4 +++- Source/MQTTnet/Client/MqttClient.cs | 4 ++-- Source/MQTTnet/Server/MqttClientSessionsManager.cs | 4 ++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index d855819..fd4bc2e 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -14,8 +14,10 @@ * [Client] Fixed wrong value for "ClientWasConnected" in "MqttClientDisconnectedEventArgs" #976 (thanks to @dbeinder). * [Client] Added direct support for Amazon AWS connections (requires .NET Core 3.1) (thanks to @henning-krause). +* [Client] handle disconnect package and propagate disconnect reason code (thanks to @JanEggers) * [ManagedClient] Exposed the internal MQTT client. -* [Server] Added client message queue interceptor for QoS level (tahnks to @msallin). +* [Server] Added client message queue interceptor for QoS level (thanks to @msallin). +* [Server] improved behavior when multiple connections fight over a session (thanks to @JanEggers) Copyright Christian Kratky 2016-2020 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin Blazor diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 51a3612..44cd85c 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -561,7 +561,7 @@ namespace MQTTnet.Client { await SendAsync(new MqttPingRespPacket(), cancellationToken).ConfigureAwait(false); } - else if (packet is MqttDisconnectPacket disc) + else if (packet is MqttDisconnectPacket disconnectPacket) { // Also dispatch disconnect to waiting threads to generate a proper exception. _packetDispatcher.Dispatch(packet); @@ -569,7 +569,7 @@ namespace MQTTnet.Client await DisconnectAsync(new MqttClientDisconnectOptions() { // todo conversion - ReasonCode = disc.ReasonCode + ReasonCode = (MqttClientDisconnectReason)(disconnectPacket.ReasonCode ?? MqttDisconnectReasonCode.UnspecifiedError) }, cancellationToken).ConfigureAwait(false); } else if (packet is MqttAuthPacket authPacket) diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 6935bcb..84fa725 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -391,9 +391,9 @@ namespace MQTTnet.Server _connections.AddOrUpdate(connectPacket.ClientId, key => { return connection; - }, (key, tempexistingConnection) => + }, (key, tempExistingConnection) => { - existingConnection = tempexistingConnection; + existingConnection = tempExistingConnection; return connection; }); From 2eaebe0e6a152d3d493512d992911e26f3dc262c Mon Sep 17 00:00:00 2001 From: JanEggers Date: Tue, 13 Oct 2020 15:59:39 +0200 Subject: [PATCH 3/3] removed todo --- Source/MQTTnet/Client/MqttClient.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 44cd85c..1923a2e 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -568,7 +568,6 @@ namespace MQTTnet.Client await DisconnectAsync(new MqttClientDisconnectOptions() { - // todo conversion ReasonCode = (MqttClientDisconnectReason)(disconnectPacket.ReasonCode ?? MqttDisconnectReasonCode.UnspecifiedError) }, cancellationToken).ConfigureAwait(false); }