Browse Source

fixed tests

release/3.x.x
JanEggers 6 years ago
parent
commit
7154a7b3dc
2 changed files with 46 additions and 11 deletions
  1. +8
    -11
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs
  2. +38
    -0
      Tests/MQTTnet.Core.Tests/TestServerExtensions.cs

+ 8
- 11
Tests/MQTTnet.Core.Tests/MqttServerTests.cs View File

@@ -209,12 +209,9 @@ namespace MQTTnet.Core.Tests
await s.StartAsync(new MqttServerOptions()); await s.StartAsync(new MqttServerOptions());


var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c1 = await serverAdapter.ConnectTestClient(s, "c1");
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
await c1.DisconnectAsync(); await c1.DisconnectAsync();


await Task.Delay(TimeSpan.FromSeconds(2));
// TODO: Find another way to wait for the retained components.

var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var c2 = await serverAdapter.ConnectTestClient(s, "c2");
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build()); await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
@@ -244,7 +241,7 @@ namespace MQTTnet.Core.Tests
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build()); await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build());
await c1.DisconnectAsync(); await c1.DisconnectAsync();
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); var c2 = await serverAdapter.ConnectTestClient(s, "c2");
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
@@ -274,7 +271,8 @@ namespace MQTTnet.Core.Tests
await s.StartAsync(options); await s.StartAsync(options);


var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c1 = await serverAdapter.ConnectTestClient(s, "c1");
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());

await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
await c1.DisconnectAsync(); await c1.DisconnectAsync();
} }
finally finally
@@ -282,8 +280,7 @@ namespace MQTTnet.Core.Tests
await s.StopAsync(); await s.StopAsync();
} }


await Task.Delay(TimeSpan.FromSeconds(2));
// TODO: Find another way to wait for the retained components.
Assert.AreEqual(1, storage.Messages.Count);


s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());


@@ -385,17 +382,17 @@ namespace MQTTnet.Core.Tests


private class TestStorage : IMqttServerStorage private class TestStorage : IMqttServerStorage
{ {
private IList<MqttApplicationMessage> _messages = new List<MqttApplicationMessage>();
public IList<MqttApplicationMessage> Messages = new List<MqttApplicationMessage>();


public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages) public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
{ {
_messages = messages;
Messages = messages;
return Task.CompletedTask; return Task.CompletedTask;
} }


public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync() public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
{ {
return Task.FromResult(_messages);
return Task.FromResult(Messages);
} }
} }




+ 38
- 0
Tests/MQTTnet.Core.Tests/TestServerExtensions.cs View File

@@ -0,0 +1,38 @@
using MQTTnet.Client;
using MQTTnet.Server;
using System;
using System.Threading.Tasks;

namespace MQTTnet.Core.Tests
{
public static class TestServerExtensions
{
/// <summary>
/// publishes a message with a client and waits in the server until a message with the same topic is received
/// </summary>
/// <returns></returns>
public static async Task PublishAndWaitForAsync(this IMqttClient client, IMqttServer server, MqttApplicationMessage message)
{
var tcs = new TaskCompletionSource<object>();

EventHandler<MqttApplicationMessageReceivedEventArgs> handler = (sender, args) =>
{
if (args.ApplicationMessage.Topic == message.Topic)
{
tcs.SetResult(true);
}
};
server.ApplicationMessageReceived += handler;
try
{
await client.PublishAsync(message);
await tcs.Task;
}
finally
{
server.ApplicationMessageReceived -= handler;
}
}
}
}

Loading…
Cancel
Save