|
|
@@ -5,6 +5,7 @@ using MQTTnet.Protocol; |
|
|
|
using MQTTnet.Server; |
|
|
|
using System; |
|
|
|
using System.Collections.Generic; |
|
|
|
using System.Linq; |
|
|
|
using System.Text; |
|
|
|
using System.Threading; |
|
|
|
using System.Threading.Tasks; |
|
|
@@ -98,11 +99,11 @@ namespace MQTTnet.Core.Tests |
|
|
|
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; |
|
|
|
|
|
|
|
var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); |
|
|
|
|
|
|
|
|
|
|
|
await c2.PublishAsync(message); |
|
|
|
await Task.Delay(1000); |
|
|
|
Assert.AreEqual(0, receivedMessagesCount); |
|
|
|
|
|
|
|
|
|
|
|
var subscribeEventCalled = false; |
|
|
|
s.ClientSubscribedTopic += (_, e) => |
|
|
|
{ |
|
|
@@ -116,7 +117,7 @@ namespace MQTTnet.Core.Tests |
|
|
|
await c2.PublishAsync(message); |
|
|
|
await Task.Delay(500); |
|
|
|
Assert.AreEqual(1, receivedMessagesCount); |
|
|
|
|
|
|
|
|
|
|
|
var unsubscribeEventCalled = false; |
|
|
|
s.ClientUnsubscribedTopic += (_, e) => |
|
|
|
{ |
|
|
@@ -234,7 +235,7 @@ namespace MQTTnet.Core.Tests |
|
|
|
await c1.PublishAsync(message); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
[TestMethod] |
|
|
|
public async Task MqttServer_ShutdownDisconnectsClientsGracefully() |
|
|
|
{ |
|
|
@@ -303,6 +304,57 @@ namespace MQTTnet.Core.Tests |
|
|
|
Assert.AreEqual(clientConnectedCalled, clientDisconnectedCalled); |
|
|
|
} |
|
|
|
|
|
|
|
[TestMethod] |
|
|
|
public async Task MqttServer_LotsOfRetainedMessages() |
|
|
|
{ |
|
|
|
const int ClientCount = 100; |
|
|
|
|
|
|
|
var server = new MqttFactory().CreateMqttServer(); |
|
|
|
try |
|
|
|
{ |
|
|
|
await server.StartAsync(new MqttServerOptionsBuilder().Build()); |
|
|
|
|
|
|
|
Parallel.For( |
|
|
|
0, |
|
|
|
ClientCount, |
|
|
|
new ParallelOptions { MaxDegreeOfParallelism = 10 }, |
|
|
|
i => |
|
|
|
{ |
|
|
|
using (var client = new MqttFactory().CreateMqttClient()) |
|
|
|
{ |
|
|
|
client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()) |
|
|
|
.GetAwaiter().GetResult(); |
|
|
|
|
|
|
|
for (var j = 0; j < 10; j++) |
|
|
|
{ |
|
|
|
// Clear retained message. |
|
|
|
client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i) |
|
|
|
.WithPayload(new byte[0]).WithRetainFlag().Build()).GetAwaiter().GetResult(); |
|
|
|
|
|
|
|
// Set retained message. |
|
|
|
client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i) |
|
|
|
.WithPayload("value" + j).WithRetainFlag().Build()).GetAwaiter().GetResult(); |
|
|
|
} |
|
|
|
|
|
|
|
client.DisconnectAsync().GetAwaiter().GetResult(); |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
var retainedMessages = server.GetRetainedMessages(); |
|
|
|
|
|
|
|
Assert.AreEqual(ClientCount, retainedMessages.Count); |
|
|
|
|
|
|
|
for (var i = 0; i < ClientCount; i++) |
|
|
|
{ |
|
|
|
Assert.IsTrue(retainedMessages.Any(m => m.Topic == "r" + i)); |
|
|
|
} |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
await server.StopAsync(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
[TestMethod] |
|
|
|
public async Task MqttServer_RetainedMessagesFlow() |
|
|
|
{ |
|
|
@@ -568,7 +620,7 @@ namespace MQTTnet.Core.Tests |
|
|
|
|
|
|
|
await server.StartAsync(options); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var clientOptions = new MqttClientOptionsBuilder() |
|
|
|
.WithTcpServer("localhost").Build(); |
|
|
|
|
|
|
@@ -593,7 +645,7 @@ namespace MQTTnet.Core.Tests |
|
|
|
{ |
|
|
|
await client.DisconnectAsync(); |
|
|
|
await server.StopAsync(); |
|
|
|
|
|
|
|
|
|
|
|
client.Dispose(); |
|
|
|
} |
|
|
|
} |
|
|
@@ -646,6 +698,33 @@ namespace MQTTnet.Core.Tests |
|
|
|
Assert.IsFalse(connecteCalledBeforeConnectedClients, "ClientConnected was called before ClientDisconnect was called"); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
[TestMethod] |
|
|
|
public async Task MqttServer_StopAndRestart() |
|
|
|
{ |
|
|
|
var server = new MqttFactory().CreateMqttServer(); |
|
|
|
await server.StartAsync(new MqttServerOptions()); |
|
|
|
|
|
|
|
var client = new MqttFactory().CreateMqttClient(); |
|
|
|
await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); |
|
|
|
await server.StopAsync(); |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
var client2 = new MqttFactory().CreateMqttClient(); |
|
|
|
await client2.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); |
|
|
|
|
|
|
|
Assert.Fail("Connecting should fail."); |
|
|
|
} |
|
|
|
catch (Exception) |
|
|
|
{ |
|
|
|
} |
|
|
|
|
|
|
|
await server.StartAsync(new MqttServerOptions()); |
|
|
|
var client3 = new MqttFactory().CreateMqttClient(); |
|
|
|
await client3.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build()); |
|
|
|
} |
|
|
|
|
|
|
|
private class TestStorage : IMqttServerStorage |
|
|
|
{ |
|
|
|
public IList<MqttApplicationMessage> Messages = new List<MqttApplicationMessage>(); |
|
|
|