From 25ddfc83a8c8a5f459969bae901c43ebc792ac3d Mon Sep 17 00:00:00 2001 From: Michi Date: Sun, 28 Nov 2021 12:41:56 +0100 Subject: [PATCH 1/3] MqttRpcClient now adds ResponseTopic to MqttApplicationMessage when using MQTT v5 (#1295) * MqttRpcClient now adds ResponseTopic to MqttApplicationMessage when using MQTT v5 * No need for conditional protocol check (ResponseTopic property won't be added by PacketFormatter in (TaskCreationOptions.RunContinuationsAsynchronously); #endif - + if (!_waitingCalls.TryAdd(responseTopic, awaitable)) { throw new InvalidOperationException(); diff --git a/Tests/MQTTnet.Core.Tests/RPC_Tests.cs b/Tests/MQTTnet.Core.Tests/RPC_Tests.cs index 1d472b6..09f5561 100644 --- a/Tests/MQTTnet.Core.Tests/RPC_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/RPC_Tests.cs @@ -56,6 +56,25 @@ namespace MQTTnet.Tests return Execute_Success(MqttQualityOfServiceLevel.ExactlyOnce, MqttProtocolVersion.V500); } + [TestMethod] + public Task Execute_Success_With_QoS_0_MQTT_V5_Use_ResponseTopic() + { + return Execute_Success_MQTT_V5(MqttQualityOfServiceLevel.AtMostOnce); + } + + [TestMethod] + public Task Execute_Success_With_QoS_1_MQTT_V5_Use_ResponseTopic() + { + return Execute_Success_MQTT_V5(MqttQualityOfServiceLevel.AtLeastOnce); + } + + [TestMethod] + public Task Execute_Success_With_QoS_2_MQTT_V5_Use_ResponseTopic() + { + return Execute_Success_MQTT_V5(MqttQualityOfServiceLevel.ExactlyOnce); + } + + [TestMethod] [ExpectedException(typeof(MqttCommunicationTimedOutException))] public async Task Execute_Timeout() @@ -63,7 +82,7 @@ namespace MQTTnet.Tests using (var testEnvironment = new TestEnvironment(TestContext)) { await testEnvironment.StartServer(); - + var requestSender = await testEnvironment.ConnectClient(); var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build()); @@ -81,8 +100,8 @@ namespace MQTTnet.Tests var requestSender = await testEnvironment.ConnectClient(); - var rpcClient = await testEnvironment.ConnectRpcClient(new MqttRpcClientOptionsBuilder().WithTopicGenerationStrategy(new TestTopicStrategy()) .Build()); - + var rpcClient = await testEnvironment.ConnectRpcClient(new MqttRpcClientOptionsBuilder().WithTopicGenerationStrategy(new TestTopicStrategy()).Build()); + await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); } } @@ -105,12 +124,86 @@ namespace MQTTnet.Tests using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build())) { var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", qosLevel); - + + Assert.AreEqual("pong", Encoding.UTF8.GetString(response)); + } + } + } + + async Task Execute_Success_MQTT_V5(MqttQualityOfServiceLevel qosLevel) + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + await testEnvironment.StartServer(); + var responseSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500)); + await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping", qosLevel); + + responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e => + { + await responseSender.PublishAsync(e.ApplicationMessage.ResponseTopic, "pong"); + }); + + var requestSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500)); + + using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build())) + { + var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", qosLevel); + + Assert.AreEqual("pong", Encoding.UTF8.GetString(response)); + } + } + } + + [TestMethod] + public async Task Execute_Success_MQTT_V5_Mixed_Clients() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + await testEnvironment.StartServer(); + var responseSender = await testEnvironment.ConnectClient(); + await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping", MqttQualityOfServiceLevel.AtMostOnce); + + responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e => + { + Assert.IsNull(e.ApplicationMessage.ResponseTopic); + await responseSender.PublishAsync(e.ApplicationMessage.Topic + "/response", "pong"); + }); + + var requestSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500)); + + using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build())) + { + var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); + Assert.AreEqual("pong", Encoding.UTF8.GetString(response)); } } } + [TestMethod] + [ExpectedException(typeof(MqttCommunicationTimedOutException))] + public async Task Execute_Timeout_MQTT_V5_Mixed_Clients() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + await testEnvironment.StartServer(); + var responseSender = await testEnvironment.ConnectClient(); + await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping", MqttQualityOfServiceLevel.AtMostOnce); + + responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e => + { + Assert.IsNull(e.ApplicationMessage.ResponseTopic); + }); + + var requestSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500)); + + using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build())) + { + var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); + } + } + } + class TestTopicStrategy : IMqttRpcClientTopicGenerationStrategy { public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context) From 731c89d2e90b4f409daa54116fbce42ebd911ccd Mon Sep 17 00:00:00 2001 From: logicaloud <47704763+logicaloud@users.noreply.github.com> Date: Mon, 29 Nov 2021 00:51:29 +1300 Subject: [PATCH 2/3] Fix MQTT5 'IsPersisted' session flag handling (#1300) --- .../Server/Internal/MqttClientSession.cs | 13 +- .../Internal/MqttClientSessionsManager.cs | 4 +- .../MQTTnet.Core.Tests/MQTTv5/Server_Tests.cs | 127 +++++++++++++++--- 3 files changed, 117 insertions(+), 27 deletions(-) diff --git a/Source/MQTTnet/Server/Internal/MqttClientSession.cs b/Source/MQTTnet/Server/Internal/MqttClientSession.cs index 291b3e5..a943f5a 100644 --- a/Source/MQTTnet/Server/Internal/MqttClientSession.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSession.cs @@ -8,11 +8,6 @@ namespace MQTTnet.Server.Internal { readonly DateTime _createdTimestamp = DateTime.UtcNow; - /// - /// Session should persist if CleanSession was set to false (Mqtt3) or if SessionExpiryInterval != 0 (Mqtt5) - /// - readonly bool _isPersistent; - public MqttClientSession( string clientId, IDictionary items, @@ -24,17 +19,19 @@ namespace MQTTnet.Server.Internal { ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); Items = items ?? throw new ArgumentNullException(nameof(items)); - + IsPersistent = isPersistent; SubscriptionsManager = new MqttClientSubscriptionsManager(this, serverOptions, eventDispatcher, retainedMessagesManager); ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions); - _isPersistent = isPersistent; } public string ClientId { get; } public bool IsCleanSession { get; set; } = true; - public bool IsPersistent => _isPersistent; + /// + /// Session should persist if CleanSession was set to false (Mqtt3) or if SessionExpiryInterval != 0 (Mqtt5) + /// + public bool IsPersistent { get; set; } public MqttApplicationMessage WillMessage { get; set; } diff --git a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs index 8831a05..471e9df 100644 --- a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs @@ -507,7 +507,7 @@ namespace MQTTnet.Server.Internal // in each time it connects. // Persist if SessionExpiryInterval != 0, but may start with a clean session - sessionShouldPersist = context.SessionExpiryInterval != 0; + sessionShouldPersist = context.SessionExpiryInterval.GetValueOrDefault() != 0; } else { @@ -540,6 +540,8 @@ namespace MQTTnet.Server.Internal else { _logger.Verbose("Reusing existing session of client '{0}'.", connectPacket.ClientId); + // Session persistence could change for MQTT 5 clients that reconnect with different SessionExpiryInterval + session.IsPersistent = sessionShouldPersist; connAckPacket.IsSessionPresent = true; } } diff --git a/Tests/MQTTnet.Core.Tests/MQTTv5/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/MQTTv5/Server_Tests.cs index 18fde51..ce015b1 100644 --- a/Tests/MQTTnet.Core.Tests/MQTTv5/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/MQTTv5/Server_Tests.cs @@ -51,15 +51,8 @@ namespace MQTTnet.Tests.MQTTv5 const string ClientId = "Client1"; // Create client without clean session and long session expiry interval - - var client1 = await testEnvironment.ConnectClient(o => o - .WithProtocolVersion(MqttProtocolVersion.V500) - .WithTcpServer("127.0.0.1", testEnvironment.ServerPort) - .WithSessionExpiryInterval(9999) - .WithCleanSession(false) - .WithClientId(ClientId) - .Build() - ); + var options1 = CreateClientOptions(testEnvironment, ClientId, false, 9999); + var client1 = await testEnvironment.ConnectClient(options1); // Disconnect; empty session should remain on server @@ -72,23 +65,121 @@ namespace MQTTnet.Tests.MQTTv5 // Reconnect the same client ID to existing session var client2 = testEnvironment.CreateClient(); - var options = testEnvironment.Factory.CreateClientOptionsBuilder() - .WithProtocolVersion(MqttProtocolVersion.V500) - .WithTcpServer("127.0.0.1", testEnvironment.ServerPort) - .WithSessionExpiryInterval(9999) - .WithCleanSession(false) - .WithClientId(ClientId) - .Build(); + var options2 = CreateClientOptions(testEnvironment, ClientId, false, 9999); + var result = await client2.ConnectAsync(options2).ConfigureAwait(false); + + await client2.DisconnectAsync(); + + // Session should be present + + Assert.IsTrue(result.IsSessionPresent, "Session not present"); + } + } + + [TestMethod] + public async Task Connect_with_Undefined_SessionExpiryInterval() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + // Create server with persistent sessions enabled + + await testEnvironment.StartServer(o => o.WithPersistentSessions()); + + const string ClientId = "Client1"; + + // Create client without clean session and NO session expiry interval, + // that means, the session should not persist + + var options1 = CreateClientOptions(testEnvironment, ClientId, false, null); + var client1 = await testEnvironment.ConnectClient(options1); + + // Disconnect; no session should remain on server because the session expiry interval was undefined + + await client1.DisconnectAsync(); + + // Simulate some time delay between connections + + await Task.Delay(1000); + + // Reconnect the same client ID to existing session + + var client2 = testEnvironment.CreateClient(); + var options2 = CreateClientOptions(testEnvironment, ClientId, false, 9999); + var result = await client2.ConnectAsync(options2).ConfigureAwait(false); + + await client2.DisconnectAsync(); + + // Session should not be present + + Assert.IsTrue(!result.IsSessionPresent, "Session is present when it should not"); + } + } + + + [TestMethod] + public async Task Reconnect_with_different_SessionExpiryInterval() + { + using (var testEnvironment = new TestEnvironment(TestContext)) + { + // Create server with persistent sessions enabled + + await testEnvironment.StartServer(o => o.WithPersistentSessions()); + + const string ClientId = "Client1"; + + // Create client with clean session and session expiry interval > 0 + + var options = CreateClientOptions(testEnvironment, ClientId, true, 9999); + var client1 = await testEnvironment.ConnectClient(options); + + // Disconnect; session should remain on server + + await client1.DisconnectAsync(); + // Simulate some time delay between connections + + await Task.Delay(1000); - var result = await client2.ConnectAsync(options).ConfigureAwait(false); + // Reconnect the same client ID to the existing session but leave session expiry interval undefined this time. + // Session should be present because the client1 connection had SessionExpiryInterval > 0 + + var client2 = testEnvironment.CreateClient(); + var options2 = CreateClientOptions(testEnvironment, ClientId, false, null); + var result2 = await client2.ConnectAsync(options2).ConfigureAwait(false); await client2.DisconnectAsync(); + Assert.IsTrue(result2.IsSessionPresent, "Session is not present when it should"); + + // Simulate some time delay between connections + + await Task.Delay(1000); + + // Reconnect the same client ID. + // No session should be present because the previous session expiry interval was undefined for the client2 connection + + var client3 = testEnvironment.CreateClient(); + var options3 = CreateClientOptions(testEnvironment, ClientId, false, null); + var result3 = await client2.ConnectAsync(options3).ConfigureAwait(false); + + await client3.DisconnectAsync(); + // Session should be present - Assert.IsTrue(result.IsSessionPresent, "Session not present"); + Assert.IsTrue(!result3.IsSessionPresent, "Session is present when it should not"); + } } + + IMqttClientOptions CreateClientOptions(TestEnvironment testEnvironment, string clientId, bool cleanSession, uint? sessionExpiryInterval) + { + return testEnvironment.Factory.CreateClientOptionsBuilder() + .WithProtocolVersion(MqttProtocolVersion.V500) + .WithTcpServer("127.0.0.1", testEnvironment.ServerPort) + .WithSessionExpiryInterval(sessionExpiryInterval) + .WithCleanSession(cleanSession) + .WithClientId(clientId) + .Build(); + } } } From 39e637795ecb07a2aadf5ae541ea2a78100f3c8b Mon Sep 17 00:00:00 2001 From: patagona Date: Sun, 28 Nov 2021 12:56:32 +0100 Subject: [PATCH 3/3] improve exception message when handling connection errors (#1302) --- Source/MQTTnet/Exceptions/MqttCommunicationTimedOutException.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/MQTTnet/Exceptions/MqttCommunicationTimedOutException.cs b/Source/MQTTnet/Exceptions/MqttCommunicationTimedOutException.cs index df3c425..38ee182 100644 --- a/Source/MQTTnet/Exceptions/MqttCommunicationTimedOutException.cs +++ b/Source/MQTTnet/Exceptions/MqttCommunicationTimedOutException.cs @@ -8,7 +8,7 @@ namespace MQTTnet.Exceptions { } - public MqttCommunicationTimedOutException(Exception innerException) : base(innerException) + public MqttCommunicationTimedOutException(Exception innerException) : base("The operation has timed out.", innerException) { } }