|
@@ -51,23 +51,33 @@ namespace MQTTnet.Core.Tests |
|
|
public async Task MqttServer_WillMessage() |
|
|
public async Task MqttServer_WillMessage() |
|
|
{ |
|
|
{ |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var s = new MqttFactory().CreateMqttServer(); |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
|
|
|
var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build(); |
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage); |
|
|
|
|
|
|
|
|
var services = new ServiceCollection() |
|
|
|
|
|
.AddLogging() |
|
|
|
|
|
.AddMqttServer() |
|
|
|
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
|
|
|
.BuildServiceProvider(); |
|
|
|
|
|
|
|
|
|
|
|
var s = new MqttFactory(services).CreateMqttServer(); |
|
|
var receivedMessagesCount = 0; |
|
|
var receivedMessagesCount = 0; |
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
await c1.SubscribeAsync(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
|
try |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
await c2.DisconnectAsync(); |
|
|
|
|
|
|
|
|
var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build(); |
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage); |
|
|
|
|
|
|
|
|
await Task.Delay(1000); |
|
|
|
|
|
|
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
await c1.SubscribeAsync(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
|
|
|
await c2.DisconnectAsync(); |
|
|
|
|
|
|
|
|
|
|
|
await Task.Delay(1000); |
|
|
|
|
|
} |
|
|
|
|
|
finally |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
} |
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@@ -75,33 +85,45 @@ namespace MQTTnet.Core.Tests |
|
|
public async Task MqttServer_Unsubscribe() |
|
|
public async Task MqttServer_Unsubscribe() |
|
|
{ |
|
|
{ |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var s = new MqttFactory().CreateMqttServer(); |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
var services = new ServiceCollection() |
|
|
|
|
|
.AddLogging() |
|
|
|
|
|
.AddMqttServer() |
|
|
|
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
|
|
|
.BuildServiceProvider(); |
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
|
|
|
var s = new MqttFactory(services).CreateMqttServer(); |
|
|
|
|
|
|
|
|
var receivedMessagesCount = 0; |
|
|
var receivedMessagesCount = 0; |
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
|
|
|
|
|
|
var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); |
|
|
|
|
|
|
|
|
try |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
await c2.PublishAsync(message); |
|
|
|
|
|
Assert.AreEqual(0, receivedMessagesCount); |
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
|
|
|
await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); |
|
|
|
|
|
await c2.PublishAsync(message); |
|
|
|
|
|
|
|
|
var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
|
|
|
|
|
|
await c2.PublishAsync(message); |
|
|
|
|
|
Assert.AreEqual(0, receivedMessagesCount); |
|
|
|
|
|
|
|
|
await c1.UnsubscribeAsync("a"); |
|
|
|
|
|
await c2.PublishAsync(message); |
|
|
|
|
|
|
|
|
await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); |
|
|
|
|
|
await c2.PublishAsync(message); |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
|
|
|
|
|
|
|
|
|
await c1.UnsubscribeAsync("a"); |
|
|
|
|
|
await c2.PublishAsync(message); |
|
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
|
|
|
} |
|
|
|
|
|
finally |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
} |
|
|
await Task.Delay(500); |
|
|
await Task.Delay(500); |
|
|
|
|
|
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
@@ -111,22 +133,34 @@ namespace MQTTnet.Core.Tests |
|
|
public async Task MqttServer_Publish() |
|
|
public async Task MqttServer_Publish() |
|
|
{ |
|
|
{ |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var s = new MqttFactory().CreateMqttServer(); |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
|
|
|
var services = new ServiceCollection() |
|
|
|
|
|
.AddLogging() |
|
|
|
|
|
.AddMqttServer() |
|
|
|
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
|
|
|
.BuildServiceProvider(); |
|
|
|
|
|
|
|
|
|
|
|
var s = new MqttFactory(services).CreateMqttServer(); |
|
|
var receivedMessagesCount = 0; |
|
|
var receivedMessagesCount = 0; |
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
|
|
|
|
|
|
var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); |
|
|
|
|
|
await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); |
|
|
|
|
|
|
|
|
try |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
|
|
|
s.PublishAsync(message).Wait(); |
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
|
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
|
|
|
var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); |
|
|
|
|
|
await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); |
|
|
|
|
|
|
|
|
|
|
|
s.PublishAsync(message).Wait(); |
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
} |
|
|
|
|
|
finally |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@@ -134,21 +168,33 @@ namespace MQTTnet.Core.Tests |
|
|
public async Task MqttServer_NoRetainedMessage() |
|
|
public async Task MqttServer_NoRetainedMessage() |
|
|
{ |
|
|
{ |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var s = new MqttFactory().CreateMqttServer(); |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).Build()); |
|
|
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
|
var services = new ServiceCollection() |
|
|
|
|
|
.AddLogging() |
|
|
|
|
|
.AddMqttServer() |
|
|
|
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
|
|
|
.BuildServiceProvider(); |
|
|
|
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
|
|
|
var s = new MqttFactory(services).CreateMqttServer(); |
|
|
var receivedMessagesCount = 0; |
|
|
var receivedMessagesCount = 0; |
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
|
|
|
try |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).Build()); |
|
|
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
} |
|
|
|
|
|
finally |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
Assert.AreEqual(0, receivedMessagesCount); |
|
|
Assert.AreEqual(0, receivedMessagesCount); |
|
|
} |
|
|
} |
|
@@ -157,22 +203,34 @@ namespace MQTTnet.Core.Tests |
|
|
public async Task MqttServer_RetainedMessage() |
|
|
public async Task MqttServer_RetainedMessage() |
|
|
{ |
|
|
{ |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var s = new MqttFactory().CreateMqttServer(); |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
var services = new ServiceCollection() |
|
|
|
|
|
.AddLogging() |
|
|
|
|
|
.AddMqttServer() |
|
|
|
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
|
|
|
.BuildServiceProvider(); |
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); |
|
|
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
|
var s = new MqttFactory(services).CreateMqttServer(); |
|
|
|
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
var receivedMessagesCount = 0; |
|
|
var receivedMessagesCount = 0; |
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
|
try |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); |
|
|
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
} |
|
|
|
|
|
finally |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@@ -182,26 +240,32 @@ namespace MQTTnet.Core.Tests |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var services = new ServiceCollection() |
|
|
var services = new ServiceCollection() |
|
|
.AddLogging() |
|
|
.AddLogging() |
|
|
.AddMqttServer() // TODO: Is there maybe an easier way for the library user to set the options? |
|
|
|
|
|
|
|
|
.AddMqttServer() |
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
.BuildServiceProvider(); |
|
|
.BuildServiceProvider(); |
|
|
|
|
|
|
|
|
var s = new MqttFactory(services).CreateMqttServer(); |
|
|
var s = new MqttFactory(services).CreateMqttServer(); |
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
var receivedMessagesCount = 0; |
|
|
|
|
|
try |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); |
|
|
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build()); |
|
|
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); |
|
|
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build()); |
|
|
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
var receivedMessagesCount = 0; |
|
|
|
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
} |
|
|
|
|
|
finally |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
|
|
|
|
|
|
Assert.AreEqual(0, receivedMessagesCount); |
|
|
Assert.AreEqual(0, receivedMessagesCount); |
|
|
} |
|
|
} |
|
@@ -214,30 +278,42 @@ namespace MQTTnet.Core.Tests |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var services = new ServiceCollection() |
|
|
var services = new ServiceCollection() |
|
|
.AddLogging() |
|
|
.AddLogging() |
|
|
.AddMqttServer(options => options.Storage = storage) // TODO: Is there maybe an easier way for the library user to set the options? |
|
|
|
|
|
|
|
|
.AddMqttServer() |
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
.BuildServiceProvider(); |
|
|
.BuildServiceProvider(); |
|
|
|
|
|
|
|
|
var s = new MqttFactory(services).CreateMqttServer(); // TODO: Like here? |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
var s = new MqttFactory(services).CreateMqttServer(options => options.Storage = storage); |
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); |
|
|
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
|
try |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); |
|
|
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
} |
|
|
|
|
|
finally |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
s = services.GetRequiredService<IMqttServer>(); |
|
|
s = services.GetRequiredService<IMqttServer>(); |
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
var receivedMessagesCount = 0; |
|
|
var receivedMessagesCount = 0; |
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
|
try |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); |
|
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
} |
|
|
|
|
|
finally |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
} |
|
|
} |
|
@@ -253,30 +329,37 @@ namespace MQTTnet.Core.Tests |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var services = new ServiceCollection() |
|
|
var services = new ServiceCollection() |
|
|
.AddLogging() |
|
|
.AddLogging() |
|
|
.AddMqttServer(options => options.ApplicationMessageInterceptor = Interceptor) |
|
|
|
|
|
|
|
|
.AddMqttServer() |
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
.BuildServiceProvider(); |
|
|
.BuildServiceProvider(); |
|
|
|
|
|
|
|
|
var s = services.GetRequiredService<IMqttServer>(); |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
var s = new MqttFactory(services).CreateMqttServer(options => options.ApplicationMessageInterceptor = Interceptor); |
|
|
|
|
|
try |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build()); |
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build()); |
|
|
|
|
|
|
|
|
var isIntercepted = false; |
|
|
|
|
|
c2.ApplicationMessageReceived += (sender, args) => |
|
|
|
|
|
{ |
|
|
|
|
|
isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0; |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
var isIntercepted = false; |
|
|
|
|
|
c2.ApplicationMessageReceived += (sender, args) => |
|
|
|
|
|
{ |
|
|
|
|
|
isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0; |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
var m = new MqttApplicationMessageBuilder().WithTopic("test").Build(); |
|
|
|
|
|
await c1.PublishAsync(m); |
|
|
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
|
var m = new MqttApplicationMessageBuilder().WithTopic("test").Build(); |
|
|
|
|
|
await c1.PublishAsync(m); |
|
|
|
|
|
await c1.DisconnectAsync(); |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
|
|
|
Assert.IsTrue(isIntercepted); |
|
|
|
|
|
|
|
|
Assert.IsTrue(isIntercepted); |
|
|
|
|
|
} |
|
|
|
|
|
finally |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private class TestStorage : IMqttServerStorage |
|
|
private class TestStorage : IMqttServerStorage |
|
@@ -305,28 +388,34 @@ namespace MQTTnet.Core.Tests |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var serverAdapter = new TestMqttServerAdapter(); |
|
|
var services = new ServiceCollection() |
|
|
var services = new ServiceCollection() |
|
|
.AddMqttServer() |
|
|
.AddMqttServer() |
|
|
|
|
|
.AddLogging() |
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
.AddSingleton<IMqttServerAdapter>(serverAdapter) |
|
|
.BuildServiceProvider(); |
|
|
.BuildServiceProvider(); |
|
|
|
|
|
|
|
|
var s = services.GetRequiredService<IMqttServer>(); |
|
|
var s = services.GetRequiredService<IMqttServer>(); |
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
|
|
|
|
|
|
var receivedMessagesCount = 0; |
|
|
var receivedMessagesCount = 0; |
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
|
|
|
try |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StartAsync(); |
|
|
|
|
|
|
|
|
await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); |
|
|
|
|
|
await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); |
|
|
|
|
|
|
|
|
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); |
|
|
|
|
|
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
await c1.UnsubscribeAsync(topicFilter); |
|
|
|
|
|
|
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
|
|
|
await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); |
|
|
|
|
|
await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); |
|
|
|
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
await c1.UnsubscribeAsync(topicFilter); |
|
|
|
|
|
|
|
|
|
|
|
await Task.Delay(500); |
|
|
|
|
|
} |
|
|
|
|
|
finally |
|
|
|
|
|
{ |
|
|
|
|
|
await s.StopAsync(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount); |
|
|
Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|