Browse Source

Update Subscribe_Tests.cs

master
Christian 2 years ago
parent
commit
cf2974cd34
1 changed files with 125 additions and 116 deletions
  1. +125
    -116
      Source/MQTTnet.Tests/Server/Subscribe_Tests.cs

+ 125
- 116
Source/MQTTnet.Tests/Server/Subscribe_Tests.cs View File

@@ -18,6 +18,33 @@ namespace MQTTnet.Tests.Server
[TestClass]
public sealed class Subscribe_Tests : BaseTestClass
{
[TestMethod]
public async Task Deny_Invalid_Topic()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
{
var server = await testEnvironment.StartServer();

server.InterceptingSubscriptionAsync += e =>
{
if (e.TopicFilter.Topic == "not_allowed_topic")
{
e.Response.ReasonCode = MqttSubscribeReasonCode.TopicFilterInvalid;
}

return PlatformAbstractionLayer.CompletedTask;
};

var client = await testEnvironment.ConnectClient();

var subscribeResult = await client.SubscribeAsync("allowed_topic");
Assert.AreEqual(MqttClientSubscribeResultCode.GrantedQoS0, subscribeResult.Items.First().ResultCode);

subscribeResult = await client.SubscribeAsync("not_allowed_topic");
Assert.AreEqual(MqttClientSubscribeResultCode.TopicFilterInvalid, subscribeResult.Items.First().ResultCode);
}
}

[TestMethod]
public async Task Intercept_Subscription()
{
@@ -31,7 +58,7 @@ namespace MQTTnet.Tests.Server
e.TopicFilter.Topic = "a";
return PlatformAbstractionLayer.CompletedTask;
};
var topicAReceived = false;
var topicBReceived = false;

@@ -46,7 +73,7 @@ namespace MQTTnet.Tests.Server
{
topicBReceived = true;
}
return PlatformAbstractionLayer.CompletedTask;
};

@@ -62,67 +89,27 @@ namespace MQTTnet.Tests.Server
}

[TestMethod]
public async Task Subscribe_Unsubscribe()
public async Task Response_Contains_Equal_Reason_Codes()
{
using (var testEnvironment = CreateTestEnvironment())
{
var receivedMessagesCount = 0;

var server = await testEnvironment.StartServer();

var c1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("c1"));
c1.ApplicationMessageReceivedAsync += e =>
{
Interlocked.Increment(ref receivedMessagesCount);
return PlatformAbstractionLayer.CompletedTask;
};

var c2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("c2"));

var message = new MqttApplicationMessageBuilder().WithTopic("a").WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build();
await c2.PublishAsync(message);

await Task.Delay(500);
Assert.AreEqual(0, receivedMessagesCount);

var subscribeEventCalled = false;
server.ClientSubscribedTopicAsync += e =>
{
subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == c1.Options.ClientId;
return PlatformAbstractionLayer.CompletedTask;
};

await c1.SubscribeAsync(new MqttTopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
await Task.Delay(250);
Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");

await c2.PublishAsync(message);
await Task.Delay(250);
Assert.AreEqual(1, receivedMessagesCount);

var unsubscribeEventCalled = false;
server.ClientUnsubscribedTopicAsync += e =>
{
unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == c1.Options.ClientId;
return PlatformAbstractionLayer.CompletedTask;
};

await c1.UnsubscribeAsync("a");
await Task.Delay(250);
Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");

await c2.PublishAsync(message);
await Task.Delay(500);
Assert.AreEqual(1, receivedMessagesCount);
await testEnvironment.StartServer();
var client = await testEnvironment.ConnectClient();

await Task.Delay(500);
var subscribeOptions = new MqttClientSubscribeOptionsBuilder().WithTopicFilter("a")
.WithTopicFilter("b", MqttQualityOfServiceLevel.AtLeastOnce)
.WithTopicFilter("c", MqttQualityOfServiceLevel.ExactlyOnce)
.WithTopicFilter("d")
.Build();
var response = await client.SubscribeAsync(subscribeOptions);

Assert.AreEqual(1, receivedMessagesCount);
Assert.AreEqual(subscribeOptions.TopicFilters.Count, response.Items.Count);
}
}

[TestMethod]
public async Task Subscribe_Multiple_In_Single_Request()
public async Task Subscribe_Lots_In_Multiple_Requests()
{
using (var testEnvironment = CreateTestEnvironment())
{
@@ -136,26 +123,31 @@ namespace MQTTnet.Tests.Server
Interlocked.Increment(ref receivedMessagesCount);
return PlatformAbstractionLayer.CompletedTask;
};
await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter("a")
.WithTopicFilter("b")
.WithTopicFilter("c")
.Build());

for (var i = 0; i < 500; i++)
{
var so = new MqttClientSubscribeOptionsBuilder().WithTopicFilter(i.ToString()).Build();

await c1.SubscribeAsync(so).ConfigureAwait(false);

await Task.Delay(10);
}

var c2 = await testEnvironment.ConnectClient();

await c2.PublishStringAsync("a");
await Task.Delay(100);
Assert.AreEqual(receivedMessagesCount, 1);
var messageBuilder = new MqttApplicationMessageBuilder();
for (var i = 0; i < 500; i++)
{
messageBuilder.WithTopic(i.ToString());

await c2.PublishStringAsync("b");
await Task.Delay(100);
Assert.AreEqual(receivedMessagesCount, 2);
await c2.PublishAsync(messageBuilder.Build()).ConfigureAwait(false);

await c2.PublishStringAsync("c");
await Task.Delay(100);
Assert.AreEqual(receivedMessagesCount, 3);
await Task.Delay(10);
}

SpinWait.SpinUntil(() => receivedMessagesCount == 500, 5000);

Assert.AreEqual(500, receivedMessagesCount);
}
}

@@ -200,7 +192,7 @@ namespace MQTTnet.Tests.Server
}

[TestMethod]
public async Task Subscribe_Lots_In_Multiple_Requests()
public async Task Subscribe_Multiple_In_Multiple_Request()
{
using (var testEnvironment = CreateTestEnvironment())
{
@@ -215,36 +207,30 @@ namespace MQTTnet.Tests.Server
return PlatformAbstractionLayer.CompletedTask;
};

for (var i = 0; i < 500; i++)
{
var so = new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(i.ToString()).Build();
await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder().WithTopicFilter("a").Build());

await c1.SubscribeAsync(so).ConfigureAwait(false);
await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder().WithTopicFilter("b").Build());

await Task.Delay(10);
}
await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder().WithTopicFilter("c").Build());

var c2 = await testEnvironment.ConnectClient();

var messageBuilder = new MqttApplicationMessageBuilder();
for (var i = 0; i < 500; i++)
{
messageBuilder.WithTopic(i.ToString());

await c2.PublishAsync(messageBuilder.Build()).ConfigureAwait(false);

await Task.Delay(10);
}
await c2.PublishStringAsync("a");
await Task.Delay(100);
Assert.AreEqual(receivedMessagesCount, 1);

SpinWait.SpinUntil(() => receivedMessagesCount == 500, 5000);
await c2.PublishStringAsync("b");
await Task.Delay(100);
Assert.AreEqual(receivedMessagesCount, 2);

Assert.AreEqual(500, receivedMessagesCount);
await c2.PublishStringAsync("c");
await Task.Delay(100);
Assert.AreEqual(receivedMessagesCount, 3);
}
}

[TestMethod]
public async Task Subscribe_Multiple_In_Multiple_Request()
public async Task Subscribe_Multiple_In_Single_Request()
{
using (var testEnvironment = CreateTestEnvironment())
{
@@ -258,18 +244,8 @@ namespace MQTTnet.Tests.Server
Interlocked.Increment(ref receivedMessagesCount);
return PlatformAbstractionLayer.CompletedTask;
};
await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter("a")
.Build());

await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter("b")
.Build());

await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter("c")
.Build());
await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder().WithTopicFilter("a").WithTopicFilter("b").WithTopicFilter("c").Build());

var c2 = await testEnvironment.ConnectClient();

@@ -286,31 +262,64 @@ namespace MQTTnet.Tests.Server
Assert.AreEqual(receivedMessagesCount, 3);
}
}
[TestMethod]
public async Task Deny_Invalid_Topic()
public async Task Subscribe_Unsubscribe()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
using (var testEnvironment = CreateTestEnvironment())
{
var receivedMessagesCount = 0;

var server = await testEnvironment.StartServer();

server.InterceptingSubscriptionAsync += e =>
var c1 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("c1"));
c1.ApplicationMessageReceivedAsync += e =>
{
if (e.TopicFilter.Topic == "not_allowed_topic")
{
e.Response.ReasonCode = MqttSubscribeReasonCode.TopicFilterInvalid;
}
Interlocked.Increment(ref receivedMessagesCount);
return PlatformAbstractionLayer.CompletedTask;
};
var client = await testEnvironment.ConnectClient();
var subscribeResult =await client.SubscribeAsync("allowed_topic");
Assert.AreEqual(MqttClientSubscribeResultCode.GrantedQoS0, subscribeResult.Items.First().ResultCode);

subscribeResult =await client.SubscribeAsync("not_allowed_topic");
Assert.AreEqual(MqttClientSubscribeResultCode.TopicFilterInvalid, subscribeResult.Items.First().ResultCode);
var c2 = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithClientId("c2"));

var message = new MqttApplicationMessageBuilder().WithTopic("a").WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build();
await c2.PublishAsync(message);

await Task.Delay(500);
Assert.AreEqual(0, receivedMessagesCount);

var subscribeEventCalled = false;
server.ClientSubscribedTopicAsync += e =>
{
subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == c1.Options.ClientId;
return PlatformAbstractionLayer.CompletedTask;
};

await c1.SubscribeAsync(new MqttTopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
await Task.Delay(250);
Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");

await c2.PublishAsync(message);
await Task.Delay(250);
Assert.AreEqual(1, receivedMessagesCount);

var unsubscribeEventCalled = false;
server.ClientUnsubscribedTopicAsync += e =>
{
unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == c1.Options.ClientId;
return PlatformAbstractionLayer.CompletedTask;
};

await c1.UnsubscribeAsync("a");
await Task.Delay(250);
Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");

await c2.PublishAsync(message);
await Task.Delay(500);
Assert.AreEqual(1, receivedMessagesCount);

await Task.Delay(500);

Assert.AreEqual(1, receivedMessagesCount);
}
}
}

Loading…
Cancel
Save