From cf2974cd342f25fabc02447cf0f6d7d29403c9bc Mon Sep 17 00:00:00 2001 From: Christian <6939810+chkr1011@users.noreply.github.com> Date: Sun, 20 Mar 2022 15:25:38 +0100 Subject: [PATCH] Update Subscribe_Tests.cs --- .../MQTTnet.Tests/Server/Subscribe_Tests.cs | 241 +++++++++--------- 1 file changed, 125 insertions(+), 116 deletions(-) diff --git a/Source/MQTTnet.Tests/Server/Subscribe_Tests.cs b/Source/MQTTnet.Tests/Server/Subscribe_Tests.cs index f79fcdf..cc263fe 100644 --- a/Source/MQTTnet.Tests/Server/Subscribe_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Subscribe_Tests.cs @@ -18,6 +18,33 @@ namespace MQTTnet.Tests.Server [TestClass] public sealed class Subscribe_Tests : BaseTestClass { + [TestMethod] + public async Task Deny_Invalid_Topic() + { + using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500)) + { + var server = await testEnvironment.StartServer(); + + server.InterceptingSubscriptionAsync += e => + { + if (e.TopicFilter.Topic == "not_allowed_topic") + { + e.Response.ReasonCode = MqttSubscribeReasonCode.TopicFilterInvalid; + } + + return PlatformAbstractionLayer.CompletedTask; + }; + + var client = await testEnvironment.ConnectClient(); + + var subscribeResult = await client.SubscribeAsync("allowed_topic"); + Assert.AreEqual(MqttClientSubscribeResultCode.GrantedQoS0, subscribeResult.Items.First().ResultCode); + + subscribeResult = await client.SubscribeAsync("not_allowed_topic"); + Assert.AreEqual(MqttClientSubscribeResultCode.TopicFilterInvalid, subscribeResult.Items.First().ResultCode); + } + } + [TestMethod] public async Task Intercept_Subscription() { @@ -31,7 +58,7 @@ namespace MQTTnet.Tests.Server e.TopicFilter.Topic = "a"; return PlatformAbstractionLayer.CompletedTask; }; - + var topicAReceived = false; var topicBReceived = false; @@ -46,7 +73,7 @@ namespace MQTTnet.Tests.Server { topicBReceived = true; } - + return PlatformAbstractionLayer.CompletedTask; }; @@ -62,67 +89,27 @@ namespace MQTTnet.Tests.Server } [TestMethod] - public async Task Subscribe_Unsubscribe() + public async Task Response_Contains_Equal_Reason_Codes() { using (var testEnvironment = CreateTestEnvironment()) { - var receivedMessagesCount = 0; - - var server = await testEnvironment.StartServer(); - - var c1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("c1")); - c1.ApplicationMessageReceivedAsync += e => - { - Interlocked.Increment(ref receivedMessagesCount); - return PlatformAbstractionLayer.CompletedTask; - }; - - var c2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("c2")); - - var message = new MqttApplicationMessageBuilder().WithTopic("a").WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build(); - await c2.PublishAsync(message); - - await Task.Delay(500); - Assert.AreEqual(0, receivedMessagesCount); - - var subscribeEventCalled = false; - server.ClientSubscribedTopicAsync += e => - { - subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == c1.Options.ClientId; - return PlatformAbstractionLayer.CompletedTask; - }; - - await c1.SubscribeAsync(new MqttTopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }); - await Task.Delay(250); - Assert.IsTrue(subscribeEventCalled, "Subscribe event not called."); - - await c2.PublishAsync(message); - await Task.Delay(250); - Assert.AreEqual(1, receivedMessagesCount); - - var unsubscribeEventCalled = false; - server.ClientUnsubscribedTopicAsync += e => - { - unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == c1.Options.ClientId; - return PlatformAbstractionLayer.CompletedTask; - }; - - await c1.UnsubscribeAsync("a"); - await Task.Delay(250); - Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called."); - - await c2.PublishAsync(message); - await Task.Delay(500); - Assert.AreEqual(1, receivedMessagesCount); + await testEnvironment.StartServer(); + var client = await testEnvironment.ConnectClient(); - await Task.Delay(500); + var subscribeOptions = new MqttClientSubscribeOptionsBuilder().WithTopicFilter("a") + .WithTopicFilter("b", MqttQualityOfServiceLevel.AtLeastOnce) + .WithTopicFilter("c", MqttQualityOfServiceLevel.ExactlyOnce) + .WithTopicFilter("d") + .Build(); + + var response = await client.SubscribeAsync(subscribeOptions); - Assert.AreEqual(1, receivedMessagesCount); + Assert.AreEqual(subscribeOptions.TopicFilters.Count, response.Items.Count); } } [TestMethod] - public async Task Subscribe_Multiple_In_Single_Request() + public async Task Subscribe_Lots_In_Multiple_Requests() { using (var testEnvironment = CreateTestEnvironment()) { @@ -136,26 +123,31 @@ namespace MQTTnet.Tests.Server Interlocked.Increment(ref receivedMessagesCount); return PlatformAbstractionLayer.CompletedTask; }; - - await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder() - .WithTopicFilter("a") - .WithTopicFilter("b") - .WithTopicFilter("c") - .Build()); + + for (var i = 0; i < 500; i++) + { + var so = new MqttClientSubscribeOptionsBuilder().WithTopicFilter(i.ToString()).Build(); + + await c1.SubscribeAsync(so).ConfigureAwait(false); + + await Task.Delay(10); + } var c2 = await testEnvironment.ConnectClient(); - await c2.PublishStringAsync("a"); - await Task.Delay(100); - Assert.AreEqual(receivedMessagesCount, 1); + var messageBuilder = new MqttApplicationMessageBuilder(); + for (var i = 0; i < 500; i++) + { + messageBuilder.WithTopic(i.ToString()); - await c2.PublishStringAsync("b"); - await Task.Delay(100); - Assert.AreEqual(receivedMessagesCount, 2); + await c2.PublishAsync(messageBuilder.Build()).ConfigureAwait(false); - await c2.PublishStringAsync("c"); - await Task.Delay(100); - Assert.AreEqual(receivedMessagesCount, 3); + await Task.Delay(10); + } + + SpinWait.SpinUntil(() => receivedMessagesCount == 500, 5000); + + Assert.AreEqual(500, receivedMessagesCount); } } @@ -200,7 +192,7 @@ namespace MQTTnet.Tests.Server } [TestMethod] - public async Task Subscribe_Lots_In_Multiple_Requests() + public async Task Subscribe_Multiple_In_Multiple_Request() { using (var testEnvironment = CreateTestEnvironment()) { @@ -215,36 +207,30 @@ namespace MQTTnet.Tests.Server return PlatformAbstractionLayer.CompletedTask; }; - for (var i = 0; i < 500; i++) - { - var so = new MqttClientSubscribeOptionsBuilder() - .WithTopicFilter(i.ToString()).Build(); + await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder().WithTopicFilter("a").Build()); - await c1.SubscribeAsync(so).ConfigureAwait(false); + await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder().WithTopicFilter("b").Build()); - await Task.Delay(10); - } + await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder().WithTopicFilter("c").Build()); var c2 = await testEnvironment.ConnectClient(); - var messageBuilder = new MqttApplicationMessageBuilder(); - for (var i = 0; i < 500; i++) - { - messageBuilder.WithTopic(i.ToString()); - - await c2.PublishAsync(messageBuilder.Build()).ConfigureAwait(false); - - await Task.Delay(10); - } + await c2.PublishStringAsync("a"); + await Task.Delay(100); + Assert.AreEqual(receivedMessagesCount, 1); - SpinWait.SpinUntil(() => receivedMessagesCount == 500, 5000); + await c2.PublishStringAsync("b"); + await Task.Delay(100); + Assert.AreEqual(receivedMessagesCount, 2); - Assert.AreEqual(500, receivedMessagesCount); + await c2.PublishStringAsync("c"); + await Task.Delay(100); + Assert.AreEqual(receivedMessagesCount, 3); } } [TestMethod] - public async Task Subscribe_Multiple_In_Multiple_Request() + public async Task Subscribe_Multiple_In_Single_Request() { using (var testEnvironment = CreateTestEnvironment()) { @@ -258,18 +244,8 @@ namespace MQTTnet.Tests.Server Interlocked.Increment(ref receivedMessagesCount); return PlatformAbstractionLayer.CompletedTask; }; - - await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder() - .WithTopicFilter("a") - .Build()); - - await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder() - .WithTopicFilter("b") - .Build()); - await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder() - .WithTopicFilter("c") - .Build()); + await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder().WithTopicFilter("a").WithTopicFilter("b").WithTopicFilter("c").Build()); var c2 = await testEnvironment.ConnectClient(); @@ -286,31 +262,64 @@ namespace MQTTnet.Tests.Server Assert.AreEqual(receivedMessagesCount, 3); } } - + [TestMethod] - public async Task Deny_Invalid_Topic() + public async Task Subscribe_Unsubscribe() { - using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500)) + using (var testEnvironment = CreateTestEnvironment()) { + var receivedMessagesCount = 0; + var server = await testEnvironment.StartServer(); - server.InterceptingSubscriptionAsync += e => + var c1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("c1")); + c1.ApplicationMessageReceivedAsync += e => { - if (e.TopicFilter.Topic == "not_allowed_topic") - { - e.Response.ReasonCode = MqttSubscribeReasonCode.TopicFilterInvalid; - } - + Interlocked.Increment(ref receivedMessagesCount); return PlatformAbstractionLayer.CompletedTask; }; - - var client = await testEnvironment.ConnectClient(); - - var subscribeResult =await client.SubscribeAsync("allowed_topic"); - Assert.AreEqual(MqttClientSubscribeResultCode.GrantedQoS0, subscribeResult.Items.First().ResultCode); - subscribeResult =await client.SubscribeAsync("not_allowed_topic"); - Assert.AreEqual(MqttClientSubscribeResultCode.TopicFilterInvalid, subscribeResult.Items.First().ResultCode); + var c2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("c2")); + + var message = new MqttApplicationMessageBuilder().WithTopic("a").WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build(); + await c2.PublishAsync(message); + + await Task.Delay(500); + Assert.AreEqual(0, receivedMessagesCount); + + var subscribeEventCalled = false; + server.ClientSubscribedTopicAsync += e => + { + subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == c1.Options.ClientId; + return PlatformAbstractionLayer.CompletedTask; + }; + + await c1.SubscribeAsync(new MqttTopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }); + await Task.Delay(250); + Assert.IsTrue(subscribeEventCalled, "Subscribe event not called."); + + await c2.PublishAsync(message); + await Task.Delay(250); + Assert.AreEqual(1, receivedMessagesCount); + + var unsubscribeEventCalled = false; + server.ClientUnsubscribedTopicAsync += e => + { + unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == c1.Options.ClientId; + return PlatformAbstractionLayer.CompletedTask; + }; + + await c1.UnsubscribeAsync("a"); + await Task.Delay(250); + Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called."); + + await c2.PublishAsync(message); + await Task.Delay(500); + Assert.AreEqual(1, receivedMessagesCount); + + await Task.Delay(500); + + Assert.AreEqual(1, receivedMessagesCount); } } }