diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 752601f..f0c74ea 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -209,12 +209,9 @@ namespace MQTTnet.Core.Tests await s.StartAsync(new MqttServerOptions()); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); - await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); + await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); await c1.DisconnectAsync(); - await Task.Delay(TimeSpan.FromSeconds(2)); - // TODO: Find another way to wait for the retained components. - var c2 = await serverAdapter.ConnectTestClient(s, "c2"); c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build()); @@ -244,7 +241,7 @@ namespace MQTTnet.Core.Tests await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build()); await c1.DisconnectAsync(); - + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); @@ -274,7 +271,8 @@ namespace MQTTnet.Core.Tests await s.StartAsync(options); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); - await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); + + await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); await c1.DisconnectAsync(); } finally @@ -282,8 +280,7 @@ namespace MQTTnet.Core.Tests await s.StopAsync(); } - await Task.Delay(TimeSpan.FromSeconds(2)); - // TODO: Find another way to wait for the retained components. + Assert.AreEqual(1, storage.Messages.Count); s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); @@ -385,17 +382,17 @@ namespace MQTTnet.Core.Tests private class TestStorage : IMqttServerStorage { - private IList _messages = new List(); + public IList Messages = new List(); public Task SaveRetainedMessagesAsync(IList messages) { - _messages = messages; + Messages = messages; return Task.CompletedTask; } public Task> LoadRetainedMessagesAsync() { - return Task.FromResult(_messages); + return Task.FromResult(Messages); } } diff --git a/Tests/MQTTnet.Core.Tests/TestServerExtensions.cs b/Tests/MQTTnet.Core.Tests/TestServerExtensions.cs new file mode 100644 index 0000000..2826f8a --- /dev/null +++ b/Tests/MQTTnet.Core.Tests/TestServerExtensions.cs @@ -0,0 +1,38 @@ +using MQTTnet.Client; +using MQTTnet.Server; +using System; +using System.Threading.Tasks; + +namespace MQTTnet.Core.Tests +{ + public static class TestServerExtensions + { + /// + /// publishes a message with a client and waits in the server until a message with the same topic is received + /// + /// + public static async Task PublishAndWaitForAsync(this IMqttClient client, IMqttServer server, MqttApplicationMessage message) + { + var tcs = new TaskCompletionSource(); + + EventHandler handler = (sender, args) => + { + if (args.ApplicationMessage.Topic == message.Topic) + { + tcs.SetResult(true); + } + }; + server.ApplicationMessageReceived += handler; + + try + { + await client.PublishAsync(message); + await tcs.Task; + } + finally + { + server.ApplicationMessageReceived -= handler; + } + } + } +}