Browse Source

Merge pull request #1004 from JanEggers/fix#990

improved behavior when multiple connections fight over a session
release/3.x.x
Christian 4 years ago
committed by GitHub
parent
commit
5d73de2a86
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 78 additions and 23 deletions
  1. +3
    -1
      Build/MQTTnet.nuspec
  2. +5
    -2
      Source/MQTTnet/Client/MqttClient.cs
  3. +9
    -0
      Source/MQTTnet/Server/MqttClientConnection.cs
  4. +25
    -20
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  5. +36
    -0
      Tests/MQTTnet.Core.Tests/Session_Tests.cs

+ 3
- 1
Build/MQTTnet.nuspec View File

@@ -14,8 +14,10 @@
<releaseNotes>
* [Client] Fixed wrong value for "ClientWasConnected" in "MqttClientDisconnectedEventArgs" #976 (thanks to @dbeinder).
* [Client] Added direct support for Amazon AWS connections (requires .NET Core 3.1) (thanks to @henning-krause).
* [Client] handle disconnect package and propagate disconnect reason code (thanks to @JanEggers)
* [ManagedClient] Exposed the internal MQTT client.
* [Server] Added client message queue interceptor for QoS level (tahnks to @msallin).
* [Server] Added client message queue interceptor for QoS level (thanks to @msallin).
* [Server] improved behavior when multiple connections fight over a session (thanks to @JanEggers)
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin Blazor</tags>


+ 5
- 2
Source/MQTTnet/Client/MqttClient.cs View File

@@ -561,12 +561,15 @@ namespace MQTTnet.Client
{
await SendAsync(new MqttPingRespPacket(), cancellationToken).ConfigureAwait(false);
}
else if (packet is MqttDisconnectPacket)
else if (packet is MqttDisconnectPacket disconnectPacket)
{
// Also dispatch disconnect to waiting threads to generate a proper exception.
_packetDispatcher.Dispatch(packet);

await DisconnectAsync(null, cancellationToken).ConfigureAwait(false);
await DisconnectAsync(new MqttClientDisconnectOptions()
{
ReasonCode = (MqttClientDisconnectReason)(disconnectPacket.ReasonCode ?? MqttDisconnectReasonCode.UnspecifiedError)
}, cancellationToken).ConfigureAwait(false);
}
else if (packet is MqttAuthPacket authPacket)
{


+ 9
- 0
Source/MQTTnet/Server/MqttClientConnection.cs View File

@@ -257,6 +257,15 @@ namespace MQTTnet.Server
Session.WillMessage = null;
}

if (_isTakeover)
{
// dont use SendAsync here _cancellationToken is already cancelled
await _channelAdapter.SendPacketAsync(new MqttDisconnectPacket()
{
ReasonCode = MqttDisconnectReasonCode.SessionTakenOver
}, TimeSpan.Zero, CancellationToken.None).ConfigureAwait(false);
}

_packetDispatcher.Reset();

_channelAdapter.ReadingPacketStartedCallback = null;


+ 25
- 20
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -316,7 +316,11 @@ namespace MQTTnet.Server
{
if (clientId != null)
{
_connections.TryRemove(clientId, out _);
// in case it is a takeover _connections already contains the new connection
if (disconnectType != MqttClientDisconnectType.Takeover)
{
_connections.TryRemove(clientId, out _);
}

if (!_options.EnablePersistentSessions)
{
@@ -364,38 +368,39 @@ namespace MQTTnet.Server
{
using (await _createConnectionGate.WaitAsync(_cancellationToken).ConfigureAwait(false))
{
var isSessionPresent = _sessions.TryGetValue(connectPacket.ClientId, out var session);

var isConnectionPresent = _connections.TryGetValue(connectPacket.ClientId, out var existingConnection);
if (isConnectionPresent)
var session = _sessions.AddOrUpdate(connectPacket.ClientId, key =>
{
await existingConnection.StopAsync(true).ConfigureAwait(false);
}

if (isSessionPresent)
_logger.Verbose("Created a new session for client '{0}'.", key);
return new MqttClientSession(key, connectionValidatorContext.SessionItems, _eventDispatcher, _options, _retainedMessagesManager, _rootLogger);
}, (key, existingSession) =>
{
if (connectPacket.CleanSession)
{
session = null;

_logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId);
return new MqttClientSession(key, connectionValidatorContext.SessionItems, _eventDispatcher, _options, _retainedMessagesManager, _rootLogger);
}
else
{
_logger.Verbose("Reusing existing session of client '{0}'.", connectPacket.ClientId);
return existingSession;
}
}

if (session == null)
{
session = new MqttClientSession(connectPacket.ClientId, connectionValidatorContext.SessionItems, _eventDispatcher, _options, _retainedMessagesManager, _rootLogger);
_logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId);
}
});

var connection = new MqttClientConnection(connectPacket, channelAdapter, session, _options, this, _retainedMessagesManager, onStart, onStop, _rootLogger);
MqttClientConnection existingConnection = null;
_connections.AddOrUpdate(connectPacket.ClientId, key =>
{
return connection;
}, (key, tempExistingConnection) =>
{
existingConnection = tempExistingConnection;
return connection;
});

_connections[connection.ClientId] = connection;
_sessions[session.ClientId] = session;
if (existingConnection != null)
{
await existingConnection.StopAsync(true).ConfigureAwait(false);
}

return connection;
}


+ 36
- 0
Tests/MQTTnet.Core.Tests/Session_Tests.cs View File

@@ -1,5 +1,6 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Subscribing;
using MQTTnet.Server;
using MQTTnet.Tests.Mockups;
@@ -84,5 +85,40 @@ namespace MQTTnet.Tests
Assert.AreEqual(true, session.Items["can_subscribe_x"]);
}
}


[TestMethod]
public async Task Manage_Session_MaxParallel()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
testEnvironment.IgnoreClientLogErrors = true;
var serverOptions = new MqttServerOptionsBuilder();
await testEnvironment.StartServerAsync(serverOptions);

var options = new MqttClientOptionsBuilder()
.WithClientId("1")
;

var clients = await Task.WhenAll(Enumerable.Range(0, 10)
.Select(i => TryConnect(testEnvironment, options)));

var connectedClients = clients.Where(c => c?.IsConnected ?? false).ToList();

Assert.AreEqual(1, connectedClients.Count);
}
}

private async Task<IMqttClient> TryConnect(TestEnvironment testEnvironment, MqttClientOptionsBuilder options)
{
try
{
return await testEnvironment.ConnectClientAsync(options);
}
catch (System.Exception)
{
return null;
}
}
}
}

Loading…
Cancel
Save