From ad5834dd1f4da1c7f53bb26da9946528a2a1d0ba Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Tue, 2 Apr 2019 21:50:16 +0200 Subject: [PATCH] Add long runnings tests for all QoS levels. --- Tests/MQTTnet.Core.Tests/AsyncQueue_Tests.cs | 54 +++++- .../Mockups/TestMqttCommunicationAdapter.cs | 7 +- .../MQTTnet.Core.Tests/Server_Status_Tests.cs | 7 +- .../PerformanceTest.cs | 155 +++++++++++++++++- Tests/MQTTnet.TestApp.NetCore/Program.cs | 17 +- 5 files changed, 229 insertions(+), 11 deletions(-) diff --git a/Tests/MQTTnet.Core.Tests/AsyncQueue_Tests.cs b/Tests/MQTTnet.Core.Tests/AsyncQueue_Tests.cs index ebe3b19..c222cdf 100644 --- a/Tests/MQTTnet.Core.Tests/AsyncQueue_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/AsyncQueue_Tests.cs @@ -16,11 +16,27 @@ namespace MQTTnet.Tests queue.Enqueue("2"); queue.Enqueue("3"); - Assert.AreEqual("1", await queue.DequeueAsync(CancellationToken.None)); - Assert.AreEqual("2", await queue.DequeueAsync(CancellationToken.None)); - Assert.AreEqual("3", await queue.DequeueAsync(CancellationToken.None)); + Assert.AreEqual("1", (await queue.TryDequeueAsync(CancellationToken.None)).Item); + Assert.AreEqual("2", (await queue.TryDequeueAsync(CancellationToken.None)).Item); + Assert.AreEqual("3", (await queue.TryDequeueAsync(CancellationToken.None)).Item); } + [TestMethod] + public void Count() + { + var queue = new AsyncQueue(); + + queue.Enqueue("1"); + Assert.AreEqual(1, queue.Count); + + queue.Enqueue("2"); + Assert.AreEqual(2, queue.Count); + + queue.Enqueue("3"); + Assert.AreEqual(3, queue.Count); + } + + [TestMethod] public async Task Preserve_ProcessAsync() { @@ -31,7 +47,7 @@ namespace MQTTnet.Tests { while (sum < 6) { - sum += await queue.DequeueAsync(CancellationToken.None); + sum += (await queue.TryDequeueAsync(CancellationToken.None)).Item; } }); @@ -47,5 +63,35 @@ namespace MQTTnet.Tests Assert.AreEqual(6, sum); Assert.AreEqual(TaskStatus.RanToCompletion, worker.Status); } + + [TestMethod] + public void Dequeue_Sync() + { + var queue = new AsyncQueue(); + queue.Enqueue("1"); + queue.Enqueue("2"); + queue.Enqueue("3"); + + Assert.AreEqual("1", queue.TryDequeue().Item); + Assert.AreEqual("2", queue.TryDequeue().Item); + Assert.AreEqual("3", queue.TryDequeue().Item); + } + + [TestMethod] + public void Clear() + { + var queue = new AsyncQueue(); + queue.Enqueue("1"); + queue.Enqueue("2"); + queue.Enqueue("3"); + + queue.Clear(); + Assert.AreEqual(0, queue.Count); + + queue.Enqueue("4"); + + Assert.AreEqual(1, queue.Count); + Assert.AreEqual("4", queue.TryDequeue().Item); + } } } diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs index 54441ef..a58a77f 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestMqttCommunicationAdapter.cs @@ -20,8 +20,11 @@ namespace MQTTnet.Tests.Mockups public MqttPacketFormatterAdapter PacketFormatterAdapter { get; } = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311); - public event EventHandler ReadingPacketStarted; - public event EventHandler ReadingPacketCompleted; + public long BytesSent { get; } + public long BytesReceived { get; } + + public Action ReadingPacketStartedCallback { get; set; } + public Action ReadingPacketCompletedCallback { get; set; } public void Dispose() { diff --git a/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs index 4822bea..8111bd0 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Status_Tests.cs @@ -1,5 +1,4 @@ -using System; -using System.Linq; +using System.Linq; using Microsoft.VisualStudio.TestTools.UnitTesting; using System.Threading.Tasks; using MQTTnet.Client.Options; @@ -148,7 +147,9 @@ namespace MQTTnet.Tests var clientStatus = await server.GetClientStatusAsync(); Assert.AreEqual(i, clientStatus.First().SentApplicationMessagesCount, "SAMC invalid!"); - Assert.AreEqual(i, clientStatus.First().SentPacketsCount, "SPC invalid!"); + + // + 1 because CONNECT is also counted. + Assert.AreEqual(i + 1, clientStatus.First().SentPacketsCount, "SPC invalid!"); // +1 because ConnACK package is already counted. Assert.AreEqual(i + 1, clientStatus.First().ReceivedPacketsCount, "RPC invalid!"); diff --git a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs index 701f45c..466d206 100644 --- a/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/PerformanceTest.cs @@ -160,7 +160,7 @@ namespace MQTTnet.TestApp.NetCore msgCount += msgs.Count; //send multiple } - + var now = DateTime.Now; if (last < now - TimeSpan.FromSeconds(1)) { @@ -193,5 +193,158 @@ namespace MQTTnet.TestApp.NetCore Interlocked.Increment(ref count); return Task.Run(() => client.PublishAsync(applicationMessage)); } + + public static async Task RunQoS2Test() + { + try + { + var mqttServer = new MqttFactory().CreateMqttServer(); + await mqttServer.StartAsync(new MqttServerOptions()); + + var options = new MqttClientOptions + { + ChannelOptions = new MqttClientTcpOptions + { + Server = "127.0.0.1" + }, + CleanSession = true + }; + + var client = new MqttFactory().CreateMqttClient(); + await client.ConnectAsync(options); + + var message = new MqttApplicationMessage + { + Topic = "A/B/C", + Payload = Encoding.UTF8.GetBytes("Hello World"), + QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce + }; + + var stopwatch = new Stopwatch(); + + var iteration = 1; + while (true) + { + stopwatch.Restart(); + + var sentMessagesCount = 0; + while (stopwatch.ElapsedMilliseconds < 1000) + { + await client.PublishAsync(message).ConfigureAwait(false); + sentMessagesCount++; + } + + Console.WriteLine($"Sent {sentMessagesCount} messages in iteration #" + iteration); + + iteration++; + } + } + catch (Exception exception) + { + Console.WriteLine(exception); + } + } + + public static async Task RunQoS1Test() + { + try + { + var mqttServer = new MqttFactory().CreateMqttServer(); + await mqttServer.StartAsync(new MqttServerOptions()); + + var options = new MqttClientOptions + { + ChannelOptions = new MqttClientTcpOptions + { + Server = "127.0.0.1" + }, + CleanSession = true + }; + + var client = new MqttFactory().CreateMqttClient(); + await client.ConnectAsync(options); + + var message = new MqttApplicationMessage + { + Topic = "A/B/C", + Payload = Encoding.UTF8.GetBytes("Hello World"), + QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce + }; + + var stopwatch = new Stopwatch(); + + var iteration = 1; + while (true) + { + stopwatch.Restart(); + + var sentMessagesCount = 0; + while (stopwatch.ElapsedMilliseconds < 1000) + { + await client.PublishAsync(message).ConfigureAwait(false); + sentMessagesCount++; + } + + Console.WriteLine($"Sent {sentMessagesCount} messages in iteration #" + iteration); + + iteration++; + } + } + catch (Exception exception) + { + Console.WriteLine(exception); + } + } + + public static async Task RunQoS0Test() + { + try + { + var mqttServer = new MqttFactory().CreateMqttServer(); + await mqttServer.StartAsync(new MqttServerOptions()); + + var options = new MqttClientOptions + { + ChannelOptions = new MqttClientTcpOptions + { + Server = "127.0.0.1" + }, + CleanSession = true + }; + + var client = new MqttFactory().CreateMqttClient(); + await client.ConnectAsync(options); + + var message = new MqttApplicationMessage + { + Topic = "A/B/C", + Payload = Encoding.UTF8.GetBytes("Hello World"), + QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce + }; + + var stopwatch = new Stopwatch(); + + var iteration = 1; + while (true) + { + stopwatch.Restart(); + + var sentMessagesCount = 0; + while (stopwatch.ElapsedMilliseconds < 1000) + { + await client.PublishAsync(message).ConfigureAwait(false); + sentMessagesCount++; + } + + Console.WriteLine($"Sent {sentMessagesCount} messages in iteration #" + iteration); + + iteration++; + } + } + catch (Exception exception) + { + Console.WriteLine(exception); + } + } } } diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index e92454e..f8fd75d 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -25,6 +25,9 @@ namespace MQTTnet.TestApp.NetCore Console.WriteLine("7 = Client flow test"); Console.WriteLine("8 = Start performance test (client only)"); Console.WriteLine("9 = Start server (no trace)"); + Console.WriteLine("a = Start QoS 2 benchmark"); + Console.WriteLine("b = Start QoS 1 benchmark"); + Console.WriteLine("c = Start QoS 0 benchmark"); var pressedKey = Console.ReadKey(true); if (pressedKey.KeyChar == '1') @@ -66,7 +69,19 @@ namespace MQTTnet.TestApp.NetCore ServerTest.RunEmptyServer(); return; } - + else if (pressedKey.KeyChar == 'a') + { + Task.Run(PerformanceTest.RunQoS2Test); + } + else if (pressedKey.KeyChar == 'b') + { + Task.Run(PerformanceTest.RunQoS1Test); + } + else if (pressedKey.KeyChar == 'c') + { + Task.Run(PerformanceTest.RunQoS0Test); + } + Thread.Sleep(Timeout.Infinite); }