diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index d548ff0..97aa3f8 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -52,22 +52,26 @@ namespace MQTTnet.Core.Tests { var serverAdapter = new TestMqttServerAdapter(); var s = new MqttFactory().CreateMqttServer(); - await s.StartAsync(); - - var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build(); - var c1 = await serverAdapter.ConnectTestClient(s, "c1"); - var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage); - var receivedMessagesCount = 0; - c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c1.SubscribeAsync(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)); + try + { + await s.StartAsync(); - await c2.DisconnectAsync(); + var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build(); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage); - await Task.Delay(1000); + c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; + await c1.SubscribeAsync(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)); - await s.StopAsync(); + await c2.DisconnectAsync(); + await Task.Delay(1000); + } + finally + { + await s.StopAsync(); + } Assert.AreEqual(1, receivedMessagesCount); } @@ -76,32 +80,38 @@ namespace MQTTnet.Core.Tests { var serverAdapter = new TestMqttServerAdapter(); var s = new MqttFactory().CreateMqttServer(); - await s.StartAsync(); - - var c1 = await serverAdapter.ConnectTestClient(s, "c1"); - var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var receivedMessagesCount = 0; - c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); + try + { + await s.StartAsync(); - await c2.PublishAsync(message); - Assert.AreEqual(0, receivedMessagesCount); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); + c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); - await c2.PublishAsync(message); + var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); - await Task.Delay(500); - Assert.AreEqual(1, receivedMessagesCount); + await c2.PublishAsync(message); + Assert.AreEqual(0, receivedMessagesCount); - await c1.UnsubscribeAsync("a"); - await c2.PublishAsync(message); + await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); + await c2.PublishAsync(message); - await Task.Delay(500); - Assert.AreEqual(1, receivedMessagesCount); + await Task.Delay(500); + Assert.AreEqual(1, receivedMessagesCount); + + await c1.UnsubscribeAsync("a"); + await c2.PublishAsync(message); - await s.StopAsync(); + await Task.Delay(500); + Assert.AreEqual(1, receivedMessagesCount); + } + finally + { + await s.StopAsync(); + } await Task.Delay(500); Assert.AreEqual(1, receivedMessagesCount); @@ -112,21 +122,27 @@ namespace MQTTnet.Core.Tests { var serverAdapter = new TestMqttServerAdapter(); var s = new MqttFactory().CreateMqttServer(); - await s.StartAsync(); - - var c1 = await serverAdapter.ConnectTestClient(s, "c1"); - var receivedMessagesCount = 0; - c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); - await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); + try + { + await s.StartAsync(); - s.PublishAsync(message).Wait(); - await Task.Delay(500); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); - await s.StopAsync(); + c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; + var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); + await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); + + s.PublishAsync(message).Wait(); + await Task.Delay(500); + } + finally + { + await s.StopAsync(); + } + Assert.AreEqual(1, receivedMessagesCount); } @@ -135,20 +151,26 @@ namespace MQTTnet.Core.Tests { var serverAdapter = new TestMqttServerAdapter(); var s = new MqttFactory().CreateMqttServer(); - await s.StartAsync(); + var receivedMessagesCount = 0; - var c1 = await serverAdapter.ConnectTestClient(s, "c1"); - await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).Build()); - await c1.DisconnectAsync(); + try + { + await s.StartAsync(); - var c2 = await serverAdapter.ConnectTestClient(s, "c2"); - var receivedMessagesCount = 0; - c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).Build()); + await c1.DisconnectAsync(); - await Task.Delay(500); + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); + c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; + await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); - await s.StopAsync(); + await Task.Delay(500); + } + finally + { + await s.StopAsync(); + } Assert.AreEqual(0, receivedMessagesCount); } @@ -158,21 +180,27 @@ namespace MQTTnet.Core.Tests { var serverAdapter = new TestMqttServerAdapter(); var s = new MqttFactory().CreateMqttServer(); - await s.StartAsync(); - - var c1 = await serverAdapter.ConnectTestClient(s, "c1"); - await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); - await c1.DisconnectAsync(); - var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var receivedMessagesCount = 0; - c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); + try + { + await s.StartAsync(); - await Task.Delay(500); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); + await c1.DisconnectAsync(); - await s.StopAsync(); + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); + c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; + await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); + await Task.Delay(500); + } + finally + { + await s.StopAsync(); + } + Assert.AreEqual(1, receivedMessagesCount); } @@ -187,21 +215,27 @@ namespace MQTTnet.Core.Tests .BuildServiceProvider(); var s = new MqttFactory(services).CreateMqttServer(); - await s.StartAsync(); + var receivedMessagesCount = 0; + try + { + await s.StartAsync(); - var c1 = await serverAdapter.ConnectTestClient(s, "c1"); - await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); - await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build()); - await c1.DisconnectAsync(); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); + await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build()); + await c1.DisconnectAsync(); - var c2 = await serverAdapter.ConnectTestClient(s, "c2"); - var receivedMessagesCount = 0; - c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); + c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; + await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); - await Task.Delay(500); + await Task.Delay(500); + } + finally + { + await s.StopAsync(); + } - await s.StopAsync(); Assert.AreEqual(0, receivedMessagesCount); } @@ -219,25 +253,37 @@ namespace MQTTnet.Core.Tests .BuildServiceProvider(); var s = new MqttFactory(services).CreateMqttServer(); // TODO: Like here? - await s.StartAsync(); - var c1 = await serverAdapter.ConnectTestClient(s, "c1"); - await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); - await c1.DisconnectAsync(); + try + { + await s.StartAsync(); - await s.StopAsync(); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); + await c1.DisconnectAsync(); + } + finally + { + await s.StopAsync(); + } s = services.GetRequiredService(); - await s.StartAsync(); - var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var receivedMessagesCount = 0; - c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); + try + { + await s.StartAsync(); - await Task.Delay(500); + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); + c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; + await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); - await s.StopAsync(); + await Task.Delay(500); + } + finally + { + await s.StopAsync(); + } Assert.AreEqual(1, receivedMessagesCount); } @@ -259,25 +305,32 @@ namespace MQTTnet.Core.Tests .BuildServiceProvider(); var s = services.GetRequiredService(); - await s.StartAsync(); + try + { + await s.StartAsync(); - var c1 = await serverAdapter.ConnectTestClient(s, "c1"); - var c2 = await serverAdapter.ConnectTestClient(s, "c2"); - await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build()); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); + await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build()); - var isIntercepted = false; - c2.ApplicationMessageReceived += (sender, args) => - { - isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0; - }; + var isIntercepted = false; + c2.ApplicationMessageReceived += (sender, args) => + { + isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0; + }; - var m = new MqttApplicationMessageBuilder().WithTopic("test").Build(); - await c1.PublishAsync(m); - await c1.DisconnectAsync(); + var m = new MqttApplicationMessageBuilder().WithTopic("test").Build(); + await c1.PublishAsync(m); + await c1.DisconnectAsync(); - await Task.Delay(500); + await Task.Delay(500); - Assert.IsTrue(isIntercepted); + Assert.IsTrue(isIntercepted); + } + finally + { + await s.StopAsync(); + } } private class TestStorage : IMqttServerStorage @@ -306,28 +359,34 @@ namespace MQTTnet.Core.Tests var serverAdapter = new TestMqttServerAdapter(); var services = new ServiceCollection() .AddMqttServer() + .AddLogging() .AddSingleton(serverAdapter) .BuildServiceProvider(); var s = services.GetRequiredService(); - await s.StartAsync(); - - var c1 = await serverAdapter.ConnectTestClient(s, "c1"); - var c2 = await serverAdapter.ConnectTestClient(s, "c2"); - var receivedMessagesCount = 0; - c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; + try + { + await s.StartAsync(); - await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); - await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); - await Task.Delay(500); - await c1.UnsubscribeAsync(topicFilter); + c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await Task.Delay(500); + await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); + await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); - await s.StopAsync(); + await Task.Delay(500); + await c1.UnsubscribeAsync(topicFilter); + await Task.Delay(500); + } + finally + { + await s.StopAsync(); + } + Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount); } }