Procházet zdrojové kódy

Fix MQTT 3.1.1 session persistence when CleanSession=1 (#1294)

release/3.x.x
logicaloud před 3 roky
committed by GitHub
rodič
revize
f0ea13c07f
V databázi nebyl nalezen žádný známý klíč pro tento podpis ID GPG klíče: 4AEE18F83AFDEB23
6 změnil soubory, kde provedl 107 přidání a 17 odebrání
  1. +12
    -2
      Source/MQTTnet/Server/Internal/MqttClientSession.cs
  2. +34
    -7
      Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs
  3. +2
    -2
      Tests/MQTTnet.Core.Tests/Server/General.cs
  4. +3
    -2
      Tests/MQTTnet.Core.Tests/Server/MqttSubscriptionsManager_Tests.cs
  5. +53
    -1
      Tests/MQTTnet.Core.Tests/Server/Session_Tests.cs
  6. +3
    -3
      Tests/MQTTnet.Core.Tests/Server/Status_Tests.cs

+ 12
- 2
Source/MQTTnet/Server/Internal/MqttClientSession.cs Zobrazit soubor

@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using MQTTnet.Server.Status;

@@ -8,24 +8,34 @@ namespace MQTTnet.Server.Internal
{
readonly DateTime _createdTimestamp = DateTime.UtcNow;

/// <summary>
/// Session should persist if CleanSession was set to false (Mqtt3) or if SessionExpiryInterval != 0 (Mqtt5)
/// </summary>
readonly bool _isPersistent;

public MqttClientSession(
string clientId,
IDictionary<object, object> items,
MqttServerEventDispatcher eventDispatcher,
IMqttServerOptions serverOptions,
IMqttRetainedMessagesManager retainedMessagesManager)
IMqttRetainedMessagesManager retainedMessagesManager,
bool isPersistent
)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
Items = items ?? throw new ArgumentNullException(nameof(items));

SubscriptionsManager = new MqttClientSubscriptionsManager(this, serverOptions, eventDispatcher, retainedMessagesManager);
ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions);
_isPersistent = isPersistent;
}

public string ClientId { get; }

public bool IsCleanSession { get; set; } = true;

public bool IsPersistent => _isPersistent;

public MqttApplicationMessage WillMessage { get; set; }

public MqttClientSubscriptionsManager SubscriptionsManager { get; }


+ 34
- 7
Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs Zobrazit soubor

@@ -126,7 +126,7 @@ namespace MQTTnet.Server.Internal

// Pass connAckPacket so that IsSessionPresent flag can be set if the client session already exists
clientConnection = await CreateClientConnection(connectPacket, connAckPacket, channelAdapter,
connectionValidatorContext.SessionItems).ConfigureAwait(false);
connectionValidatorContext).ConfigureAwait(false);

await channelAdapter.SendPacketAsync(connAckPacket, cancellationToken).ConfigureAwait(false);

@@ -156,7 +156,7 @@ namespace MQTTnet.Server.Internal
_clientConnections.Remove(clientConnection.ClientId);
}

if (!_options.EnablePersistentSessions)
if ((!_options.EnablePersistentSessions) || (!clientConnection.Session.IsPersistent))
{
await DeleteSessionAsync(clientConnection.ClientId).ConfigureAwait(false);
}
@@ -491,10 +491,35 @@ namespace MQTTnet.Server.Internal
MqttConnectPacket connectPacket,
MqttConnAckPacket connAckPacket,
IMqttChannelAdapter channelAdapter,
IDictionary<object, object> sessionItems)
MqttConnectionValidatorContext context)
{
MqttClientConnection connection;

bool sessionShouldPersist;

if (context.ProtocolVersion == MqttProtocolVersion.V500)
{
// MQTT 5.0 section 3.1.2.11.2
// The Client and Server MUST store the Session State after the Network Connection is closed if the Session Expiry Interval is greater than 0 [MQTT-3.1.2-23].
//
// A Client that only wants to process messages while connected will set the Clean Start to 1 and set the Session Expiry Interval to 0.
// It will not receive Application Messages published before it connected and has to subscribe afresh to any topics that it is interested
// in each time it connects.

// Persist if SessionExpiryInterval != 0, but may start with a clean session
sessionShouldPersist = context.SessionExpiryInterval != 0;
}
else
{
// MQTT 3.1.1 section 3.1.2.4: persist only if 'not CleanSession'
//
// If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one.
// This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be
// reused in any subsequent Session [MQTT-3.1.2-6].

sessionShouldPersist = !connectPacket.CleanSession;
}

using (await _createConnectionSyncRoot.WaitAsync(CancellationToken.None).ConfigureAwait(false))
{
MqttClientSession session;
@@ -503,14 +528,14 @@ namespace MQTTnet.Server.Internal
if (!_clientSessions.TryGetValue(connectPacket.ClientId, out session))
{
_logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId);
session = CreateSession(connectPacket.ClientId, sessionItems);
session = CreateSession(connectPacket.ClientId, context.SessionItems, sessionShouldPersist);
}
else
{
if (connectPacket.CleanSession)
{
_logger.Verbose("Deleting existing session of client '{0}'.", connectPacket.ClientId);
session = CreateSession(connectPacket.ClientId, sessionItems);
session = CreateSession(connectPacket.ClientId, context.SessionItems, sessionShouldPersist);
}
else
{
@@ -604,14 +629,16 @@ namespace MQTTnet.Server.Internal
_rootLogger);
}

MqttClientSession CreateSession(string clientId, IDictionary<object, object> sessionItems)
MqttClientSession CreateSession(string clientId, IDictionary<object, object> sessionItems, bool isPersistent)
{
return new MqttClientSession(
clientId,
sessionItems,
_eventDispatcher,
_options,
_retainedMessagesManager);
_retainedMessagesManager,
isPersistent
);
}
}
}

+ 2
- 2
Tests/MQTTnet.Core.Tests/Server/General.cs Zobrazit soubor

@@ -1167,7 +1167,7 @@ namespace MQTTnet.Tests.Server
var server = await testEnvironment.StartServer(new MqttServerOptionsBuilder().WithPersistentSessions());

// Create the session including the subscription.
var client1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("a"));
var client1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("a").WithCleanSession(false));
await client1.SubscribeAsync("x");
await client1.DisconnectAsync();
await Task.Delay(500);
@@ -1175,7 +1175,7 @@ namespace MQTTnet.Tests.Server
var clientStatus = await server.GetClientStatusAsync();
Assert.AreEqual(0, clientStatus.Count);

var client2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("b"));
var client2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("b").WithCleanSession(false));
await client2.PublishAsync("x", "1");
await client2.PublishAsync("x", "2");
await client2.PublishAsync("x", "3");


+ 3
- 2
Tests/MQTTnet.Core.Tests/Server/MqttSubscriptionsManager_Tests.cs Zobrazit soubor

@@ -1,4 +1,4 @@
using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Packets;
@@ -107,7 +107,8 @@ namespace MQTTnet.Tests.Server
new ConcurrentDictionary<object, object>(),
new MqttServerEventDispatcher(new TestLogger()),
new MqttServerOptions(),
new MqttRetainedMessagesManager());
new MqttRetainedMessagesManager(),
false);
}
}
}

+ 53
- 1
Tests/MQTTnet.Core.Tests/Server/Session_Tests.cs Zobrazit soubor

@@ -1,4 +1,4 @@
using System.Linq;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
@@ -107,6 +107,58 @@ namespace MQTTnet.Tests.Server
}
}

[TestMethod]
public async Task Clean_Session_Persistence()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
// Create server with persistent sessions enabled

await testEnvironment.StartServer(o => o.WithPersistentSessions());

const string ClientId = "Client1";

// Create client with clean session and long session expiry interval

var client1 = await testEnvironment.ConnectClient(o => o
.WithProtocolVersion(Formatter.MqttProtocolVersion.V311)
.WithTcpServer("127.0.0.1", testEnvironment.ServerPort)
.WithSessionExpiryInterval(9999) // not relevant for v311 but testing impact
.WithCleanSession(true) // start and end with clean session
.WithClientId(ClientId)
.Build()
);

// Disconnect; empty session should be removed from server

await client1.DisconnectAsync();

// Simulate some time delay between connections

await Task.Delay(1000);

// Reconnect the same client ID without clean session

var client2 = testEnvironment.CreateClient();
var options = testEnvironment.Factory.CreateClientOptionsBuilder()
.WithProtocolVersion(Formatter.MqttProtocolVersion.V311)
.WithTcpServer("127.0.0.1", testEnvironment.ServerPort)
.WithSessionExpiryInterval(9999) // not relevant for v311 but testing impact
.WithCleanSession(false) // see if there is a session
.WithClientId(ClientId)
.Build();


var result = await client2.ConnectAsync(options).ConfigureAwait(false);

await client2.DisconnectAsync();

// Session should NOT be present for MQTT v311 and initial CleanSession == true

Assert.IsTrue(!result.IsSessionPresent, "Session present");
}
}

async Task<IMqttClient> TryConnect(TestEnvironment testEnvironment, MqttClientOptionsBuilder options)
{
try


+ 3
- 3
Tests/MQTTnet.Core.Tests/Server/Status_Tests.cs Zobrazit soubor

@@ -1,4 +1,4 @@
using System.Linq;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
@@ -81,8 +81,8 @@ namespace MQTTnet.Tests.Server
{
var server = await testEnvironment.StartServer(new MqttServerOptionsBuilder().WithPersistentSessions());

var c1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("client1"));
var c2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("client2"));
var c1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("client1").WithCleanSession(false));
var c2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("client2").WithCleanSession(false));

await c1.DisconnectAsync();



Načítá se…
Zrušit
Uložit