diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 0404ccc..fc40377 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -5,6 +5,7 @@ using MQTTnet.Protocol; using MQTTnet.Server; using System; using System.Collections.Generic; +using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -98,11 +99,11 @@ namespace MQTTnet.Core.Tests c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); - + await c2.PublishAsync(message); await Task.Delay(1000); Assert.AreEqual(0, receivedMessagesCount); - + var subscribeEventCalled = false; s.ClientSubscribedTopic += (_, e) => { @@ -116,7 +117,7 @@ namespace MQTTnet.Core.Tests await c2.PublishAsync(message); await Task.Delay(500); Assert.AreEqual(1, receivedMessagesCount); - + var unsubscribeEventCalled = false; s.ClientUnsubscribedTopic += (_, e) => { @@ -234,7 +235,7 @@ namespace MQTTnet.Core.Tests await c1.PublishAsync(message); } } - + [TestMethod] public async Task MqttServer_ShutdownDisconnectsClientsGracefully() { @@ -303,6 +304,57 @@ namespace MQTTnet.Core.Tests Assert.AreEqual(clientConnectedCalled, clientDisconnectedCalled); } + [TestMethod] + public async Task MqttServer_LotsOfRetainedMessages() + { + const int ClientCount = 100; + + var server = new MqttFactory().CreateMqttServer(); + try + { + await server.StartAsync(new MqttServerOptionsBuilder().Build()); + + Parallel.For( + 0, + ClientCount, + new ParallelOptions { MaxDegreeOfParallelism = 10 }, + i => + { + using (var client = new MqttFactory().CreateMqttClient()) + { + client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()) + .GetAwaiter().GetResult(); + + for (var j = 0; j < 10; j++) + { + // Clear retained message. + client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i) + .WithPayload(new byte[0]).WithRetainFlag().Build()).GetAwaiter().GetResult(); + + // Set retained message. + client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i) + .WithPayload("value" + j).WithRetainFlag().Build()).GetAwaiter().GetResult(); + } + + client.DisconnectAsync().GetAwaiter().GetResult(); + } + }); + + var retainedMessages = server.GetRetainedMessages(); + + Assert.AreEqual(ClientCount, retainedMessages.Count); + + for (var i = 0; i < ClientCount; i++) + { + Assert.IsTrue(retainedMessages.Any(m => m.Topic == "r" + i)); + } + } + finally + { + await server.StopAsync(); + } + } + [TestMethod] public async Task MqttServer_RetainedMessagesFlow() { @@ -568,7 +620,7 @@ namespace MQTTnet.Core.Tests await server.StartAsync(options); - + var clientOptions = new MqttClientOptionsBuilder() .WithTcpServer("localhost").Build(); @@ -593,7 +645,7 @@ namespace MQTTnet.Core.Tests { await client.DisconnectAsync(); await server.StopAsync(); - + client.Dispose(); } } @@ -646,6 +698,33 @@ namespace MQTTnet.Core.Tests Assert.IsFalse(connecteCalledBeforeConnectedClients, "ClientConnected was called before ClientDisconnect was called"); } + + [TestMethod] + public async Task MqttServer_StopAndRestart() + { + var server = new MqttFactory().CreateMqttServer(); + await server.StartAsync(new MqttServerOptions()); + + var client = new MqttFactory().CreateMqttClient(); + await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); + await server.StopAsync(); + + try + { + var client2 = new MqttFactory().CreateMqttClient(); + await client2.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); + + Assert.Fail("Connecting should fail."); + } + catch (Exception) + { + } + + await server.StartAsync(new MqttServerOptions()); + var client3 = new MqttFactory().CreateMqttClient(); + await client3.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); + } + private class TestStorage : IMqttServerStorage { public IList Messages = new List(); diff --git a/Tests/MQTTnet.Core.Tests/RoundtripTimeTests.cs b/Tests/MQTTnet.Core.Tests/RoundtripTimeTests.cs new file mode 100644 index 0000000..7b5ad77 --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/RoundtripTimeTests.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MQTTnet.Client; +using MQTTnet.Server; + +namespace MQTTnet.Core.Tests +{ + [TestClass] + public class RoundtripTimeTests + { + [TestMethod] + public async Task Round_Trip_Time() + { + var factory = new MqttFactory(); + var server = factory.CreateMqttServer(); + var receiverClient = factory.CreateMqttClient(); + var senderClient = factory.CreateMqttClient(); + + await server.StartAsync(new MqttServerOptionsBuilder().Build()); + + await receiverClient.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); + await receiverClient.SubscribeAsync("#"); + + await senderClient.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); + + TaskCompletionSource response = null; + + receiverClient.ApplicationMessageReceived += (sender, args) => + { + response?.SetResult(args.ApplicationMessage.ConvertPayloadToString()); + }; + + var times = new List(); + var stopwatch = Stopwatch.StartNew(); + + for (var i = 0; i < 100; i++) + { + response = new TaskCompletionSource(); + await senderClient.PublishAsync("test", DateTime.UtcNow.Ticks.ToString()); + response.Task.GetAwaiter().GetResult(); + + stopwatch.Stop(); + times.Add(stopwatch.Elapsed); + stopwatch.Restart(); + } + } + } +} diff --git a/Tests/MQTTnet.Core.Tests/TestServerExtensions.cs b/Tests/MQTTnet.Core.Tests/TestServerExtensions.cs index cbb278c..7636b1a 100644 --- a/Tests/MQTTnet.Core.Tests/TestServerExtensions.cs +++ b/Tests/MQTTnet.Core.Tests/TestServerExtensions.cs @@ -11,19 +11,20 @@ namespace MQTTnet.Core.Tests /// publishes a message with a client and waits in the server until a message with the same topic is received /// /// - public static async Task PublishAndWaitForAsync(this IMqttClient client, IMqttServer server, MqttApplicationMessage message) + public static async Task PublishAndWaitForAsync(this IMqttClient client, IMqttServer server, MqttApplicationMessage message) { var tcs = new TaskCompletionSource(); - EventHandler handler = (sender, args) => + void Handler(object sender, MqttApplicationMessageReceivedEventArgs args) { if (args.ApplicationMessage.Topic == message.Topic) { tcs.SetResult(true); } - }; - server.ApplicationMessageReceived += handler; - + } + + server.ApplicationMessageReceived += Handler; + try { await client.PublishAsync(message).ConfigureAwait(false); @@ -31,7 +32,7 @@ namespace MQTTnet.Core.Tests } finally { - server.ApplicationMessageReceived -= handler; + server.ApplicationMessageReceived -= Handler; } } }