|
|
@@ -52,22 +52,26 @@ namespace MQTTnet.Core.Tests |
|
|
|
{ |
|
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
|
var s = new MqttFactory().CreateMqttServer(); |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build(); |
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage); |
|
|
|
|
|
|
|
var receivedMessagesCount = 0; |
|
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
await c1.SubscribeAsync(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
try |
|
|
|
{ |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
await c2.DisconnectAsync(); |
|
|
|
var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build(); |
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage); |
|
|
|
|
|
|
|
await Task.Delay(1000); |
|
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
await c1.SubscribeAsync(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
await c2.DisconnectAsync(); |
|
|
|
|
|
|
|
await Task.Delay(1000); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
await s.StopAsync(); |
|
|
|
} |
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
|
} |
|
|
|
|
|
|
@@ -76,32 +80,38 @@ namespace MQTTnet.Core.Tests |
|
|
|
{ |
|
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
|
var s = new MqttFactory().CreateMqttServer(); |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
|
|
var receivedMessagesCount = 0; |
|
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
|
|
var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); |
|
|
|
try |
|
|
|
{ |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
await c2.PublishAsync(message); |
|
|
|
Assert.AreEqual(0, receivedMessagesCount); |
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
|
|
await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); |
|
|
|
await c2.PublishAsync(message); |
|
|
|
var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); |
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
|
await c2.PublishAsync(message); |
|
|
|
Assert.AreEqual(0, receivedMessagesCount); |
|
|
|
|
|
|
|
await c1.UnsubscribeAsync("a"); |
|
|
|
await c2.PublishAsync(message); |
|
|
|
await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); |
|
|
|
await c2.PublishAsync(message); |
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
|
await Task.Delay(500); |
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
|
|
|
|
|
await c1.UnsubscribeAsync("a"); |
|
|
|
await c2.PublishAsync(message); |
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
await Task.Delay(500); |
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
await s.StopAsync(); |
|
|
|
} |
|
|
|
await Task.Delay(500); |
|
|
|
|
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
@@ -112,21 +122,27 @@ namespace MQTTnet.Core.Tests |
|
|
|
{ |
|
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
|
var s = new MqttFactory().CreateMqttServer(); |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
|
|
var receivedMessagesCount = 0; |
|
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
|
|
var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); |
|
|
|
await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); |
|
|
|
try |
|
|
|
{ |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
s.PublishAsync(message).Wait(); |
|
|
|
await Task.Delay(500); |
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
|
|
var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); |
|
|
|
await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); |
|
|
|
|
|
|
|
s.PublishAsync(message).Wait(); |
|
|
|
await Task.Delay(500); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
await s.StopAsync(); |
|
|
|
} |
|
|
|
|
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
|
} |
|
|
|
|
|
|
@@ -135,20 +151,26 @@ namespace MQTTnet.Core.Tests |
|
|
|
{ |
|
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
|
var s = new MqttFactory().CreateMqttServer(); |
|
|
|
await s.StartAsync(); |
|
|
|
var receivedMessagesCount = 0; |
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).Build()); |
|
|
|
await c1.DisconnectAsync(); |
|
|
|
try |
|
|
|
{ |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
var receivedMessagesCount = 0; |
|
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).Build()); |
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
await Task.Delay(500); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
await s.StopAsync(); |
|
|
|
} |
|
|
|
|
|
|
|
Assert.AreEqual(0, receivedMessagesCount); |
|
|
|
} |
|
|
@@ -158,21 +180,27 @@ namespace MQTTnet.Core.Tests |
|
|
|
{ |
|
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
|
var s = new MqttFactory().CreateMqttServer(); |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); |
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
var receivedMessagesCount = 0; |
|
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
try |
|
|
|
{ |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); |
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
await s.StopAsync(); |
|
|
|
} |
|
|
|
|
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
|
} |
|
|
|
|
|
|
@@ -187,21 +215,27 @@ namespace MQTTnet.Core.Tests |
|
|
|
.BuildServiceProvider(); |
|
|
|
|
|
|
|
var s = new MqttFactory(services).CreateMqttServer(); |
|
|
|
await s.StartAsync(); |
|
|
|
var receivedMessagesCount = 0; |
|
|
|
try |
|
|
|
{ |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); |
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build()); |
|
|
|
await c1.DisconnectAsync(); |
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); |
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build()); |
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
var receivedMessagesCount = 0; |
|
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
await Task.Delay(500); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
await s.StopAsync(); |
|
|
|
} |
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
|
|
Assert.AreEqual(0, receivedMessagesCount); |
|
|
|
} |
|
|
@@ -219,25 +253,37 @@ namespace MQTTnet.Core.Tests |
|
|
|
.BuildServiceProvider(); |
|
|
|
|
|
|
|
var s = new MqttFactory(services).CreateMqttServer(); // TODO: Like here? |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); |
|
|
|
await c1.DisconnectAsync(); |
|
|
|
try |
|
|
|
{ |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); |
|
|
|
await c1.DisconnectAsync(); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
await s.StopAsync(); |
|
|
|
} |
|
|
|
|
|
|
|
s = services.GetRequiredService<IMqttServer>(); |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
var receivedMessagesCount = 0; |
|
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
try |
|
|
|
{ |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
await Task.Delay(500); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
await s.StopAsync(); |
|
|
|
} |
|
|
|
|
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
|
} |
|
|
@@ -259,25 +305,32 @@ namespace MQTTnet.Core.Tests |
|
|
|
.BuildServiceProvider(); |
|
|
|
|
|
|
|
var s = services.GetRequiredService<IMqttServer>(); |
|
|
|
await s.StartAsync(); |
|
|
|
try |
|
|
|
{ |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build()); |
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build()); |
|
|
|
|
|
|
|
var isIntercepted = false; |
|
|
|
c2.ApplicationMessageReceived += (sender, args) => |
|
|
|
{ |
|
|
|
isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0; |
|
|
|
}; |
|
|
|
var isIntercepted = false; |
|
|
|
c2.ApplicationMessageReceived += (sender, args) => |
|
|
|
{ |
|
|
|
isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0; |
|
|
|
}; |
|
|
|
|
|
|
|
var m = new MqttApplicationMessageBuilder().WithTopic("test").Build(); |
|
|
|
await c1.PublishAsync(m); |
|
|
|
await c1.DisconnectAsync(); |
|
|
|
var m = new MqttApplicationMessageBuilder().WithTopic("test").Build(); |
|
|
|
await c1.PublishAsync(m); |
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
await Task.Delay(500); |
|
|
|
|
|
|
|
Assert.IsTrue(isIntercepted); |
|
|
|
Assert.IsTrue(isIntercepted); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
await s.StopAsync(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private class TestStorage : IMqttServerStorage |
|
|
@@ -306,28 +359,34 @@ namespace MQTTnet.Core.Tests |
|
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
|
var services = new ServiceCollection() |
|
|
|
.AddMqttServer() |
|
|
|
.AddLogging() |
|
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
|
.BuildServiceProvider(); |
|
|
|
|
|
|
|
var s = services.GetRequiredService<IMqttServer>(); |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
|
|
var receivedMessagesCount = 0; |
|
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
try |
|
|
|
{ |
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); |
|
|
|
await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); |
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
await c1.UnsubscribeAsync(topicFilter); |
|
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); |
|
|
|
await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); |
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
await Task.Delay(500); |
|
|
|
await c1.UnsubscribeAsync(topicFilter); |
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
await s.StopAsync(); |
|
|
|
} |
|
|
|
|
|
|
|
Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount); |
|
|
|
} |
|
|
|
} |
|
|
|