@@ -16,11 +16,27 @@ namespace MQTTnet.Tests | |||
queue.Enqueue("2"); | |||
queue.Enqueue("3"); | |||
Assert.AreEqual("1", await queue.DequeueAsync(CancellationToken.None)); | |||
Assert.AreEqual("2", await queue.DequeueAsync(CancellationToken.None)); | |||
Assert.AreEqual("3", await queue.DequeueAsync(CancellationToken.None)); | |||
Assert.AreEqual("1", (await queue.TryDequeueAsync(CancellationToken.None)).Item); | |||
Assert.AreEqual("2", (await queue.TryDequeueAsync(CancellationToken.None)).Item); | |||
Assert.AreEqual("3", (await queue.TryDequeueAsync(CancellationToken.None)).Item); | |||
} | |||
[TestMethod] | |||
public void Count() | |||
{ | |||
var queue = new AsyncQueue<string>(); | |||
queue.Enqueue("1"); | |||
Assert.AreEqual(1, queue.Count); | |||
queue.Enqueue("2"); | |||
Assert.AreEqual(2, queue.Count); | |||
queue.Enqueue("3"); | |||
Assert.AreEqual(3, queue.Count); | |||
} | |||
[TestMethod] | |||
public async Task Preserve_ProcessAsync() | |||
{ | |||
@@ -31,7 +47,7 @@ namespace MQTTnet.Tests | |||
{ | |||
while (sum < 6) | |||
{ | |||
sum += await queue.DequeueAsync(CancellationToken.None); | |||
sum += (await queue.TryDequeueAsync(CancellationToken.None)).Item; | |||
} | |||
}); | |||
@@ -47,5 +63,35 @@ namespace MQTTnet.Tests | |||
Assert.AreEqual(6, sum); | |||
Assert.AreEqual(TaskStatus.RanToCompletion, worker.Status); | |||
} | |||
[TestMethod] | |||
public void Dequeue_Sync() | |||
{ | |||
var queue = new AsyncQueue<string>(); | |||
queue.Enqueue("1"); | |||
queue.Enqueue("2"); | |||
queue.Enqueue("3"); | |||
Assert.AreEqual("1", queue.TryDequeue().Item); | |||
Assert.AreEqual("2", queue.TryDequeue().Item); | |||
Assert.AreEqual("3", queue.TryDequeue().Item); | |||
} | |||
[TestMethod] | |||
public void Clear() | |||
{ | |||
var queue = new AsyncQueue<string>(); | |||
queue.Enqueue("1"); | |||
queue.Enqueue("2"); | |||
queue.Enqueue("3"); | |||
queue.Clear(); | |||
Assert.AreEqual(0, queue.Count); | |||
queue.Enqueue("4"); | |||
Assert.AreEqual(1, queue.Count); | |||
Assert.AreEqual("4", queue.TryDequeue().Item); | |||
} | |||
} | |||
} |
@@ -20,8 +20,11 @@ namespace MQTTnet.Tests.Mockups | |||
public MqttPacketFormatterAdapter PacketFormatterAdapter { get; } = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311); | |||
public event EventHandler ReadingPacketStarted; | |||
public event EventHandler ReadingPacketCompleted; | |||
public long BytesSent { get; } | |||
public long BytesReceived { get; } | |||
public Action ReadingPacketStartedCallback { get; set; } | |||
public Action ReadingPacketCompletedCallback { get; set; } | |||
public void Dispose() | |||
{ | |||
@@ -1,5 +1,4 @@ | |||
using System; | |||
using System.Linq; | |||
using System.Linq; | |||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Client.Options; | |||
@@ -148,7 +147,9 @@ namespace MQTTnet.Tests | |||
var clientStatus = await server.GetClientStatusAsync(); | |||
Assert.AreEqual(i, clientStatus.First().SentApplicationMessagesCount, "SAMC invalid!"); | |||
Assert.AreEqual(i, clientStatus.First().SentPacketsCount, "SPC invalid!"); | |||
// + 1 because CONNECT is also counted. | |||
Assert.AreEqual(i + 1, clientStatus.First().SentPacketsCount, "SPC invalid!"); | |||
// +1 because ConnACK package is already counted. | |||
Assert.AreEqual(i + 1, clientStatus.First().ReceivedPacketsCount, "RPC invalid!"); | |||
@@ -160,7 +160,7 @@ namespace MQTTnet.TestApp.NetCore | |||
msgCount += msgs.Count; | |||
//send multiple | |||
} | |||
var now = DateTime.Now; | |||
if (last < now - TimeSpan.FromSeconds(1)) | |||
{ | |||
@@ -193,5 +193,158 @@ namespace MQTTnet.TestApp.NetCore | |||
Interlocked.Increment(ref count); | |||
return Task.Run(() => client.PublishAsync(applicationMessage)); | |||
} | |||
public static async Task RunQoS2Test() | |||
{ | |||
try | |||
{ | |||
var mqttServer = new MqttFactory().CreateMqttServer(); | |||
await mqttServer.StartAsync(new MqttServerOptions()); | |||
var options = new MqttClientOptions | |||
{ | |||
ChannelOptions = new MqttClientTcpOptions | |||
{ | |||
Server = "127.0.0.1" | |||
}, | |||
CleanSession = true | |||
}; | |||
var client = new MqttFactory().CreateMqttClient(); | |||
await client.ConnectAsync(options); | |||
var message = new MqttApplicationMessage | |||
{ | |||
Topic = "A/B/C", | |||
Payload = Encoding.UTF8.GetBytes("Hello World"), | |||
QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce | |||
}; | |||
var stopwatch = new Stopwatch(); | |||
var iteration = 1; | |||
while (true) | |||
{ | |||
stopwatch.Restart(); | |||
var sentMessagesCount = 0; | |||
while (stopwatch.ElapsedMilliseconds < 1000) | |||
{ | |||
await client.PublishAsync(message).ConfigureAwait(false); | |||
sentMessagesCount++; | |||
} | |||
Console.WriteLine($"Sent {sentMessagesCount} messages in iteration #" + iteration); | |||
iteration++; | |||
} | |||
} | |||
catch (Exception exception) | |||
{ | |||
Console.WriteLine(exception); | |||
} | |||
} | |||
public static async Task RunQoS1Test() | |||
{ | |||
try | |||
{ | |||
var mqttServer = new MqttFactory().CreateMqttServer(); | |||
await mqttServer.StartAsync(new MqttServerOptions()); | |||
var options = new MqttClientOptions | |||
{ | |||
ChannelOptions = new MqttClientTcpOptions | |||
{ | |||
Server = "127.0.0.1" | |||
}, | |||
CleanSession = true | |||
}; | |||
var client = new MqttFactory().CreateMqttClient(); | |||
await client.ConnectAsync(options); | |||
var message = new MqttApplicationMessage | |||
{ | |||
Topic = "A/B/C", | |||
Payload = Encoding.UTF8.GetBytes("Hello World"), | |||
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce | |||
}; | |||
var stopwatch = new Stopwatch(); | |||
var iteration = 1; | |||
while (true) | |||
{ | |||
stopwatch.Restart(); | |||
var sentMessagesCount = 0; | |||
while (stopwatch.ElapsedMilliseconds < 1000) | |||
{ | |||
await client.PublishAsync(message).ConfigureAwait(false); | |||
sentMessagesCount++; | |||
} | |||
Console.WriteLine($"Sent {sentMessagesCount} messages in iteration #" + iteration); | |||
iteration++; | |||
} | |||
} | |||
catch (Exception exception) | |||
{ | |||
Console.WriteLine(exception); | |||
} | |||
} | |||
public static async Task RunQoS0Test() | |||
{ | |||
try | |||
{ | |||
var mqttServer = new MqttFactory().CreateMqttServer(); | |||
await mqttServer.StartAsync(new MqttServerOptions()); | |||
var options = new MqttClientOptions | |||
{ | |||
ChannelOptions = new MqttClientTcpOptions | |||
{ | |||
Server = "127.0.0.1" | |||
}, | |||
CleanSession = true | |||
}; | |||
var client = new MqttFactory().CreateMqttClient(); | |||
await client.ConnectAsync(options); | |||
var message = new MqttApplicationMessage | |||
{ | |||
Topic = "A/B/C", | |||
Payload = Encoding.UTF8.GetBytes("Hello World"), | |||
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce | |||
}; | |||
var stopwatch = new Stopwatch(); | |||
var iteration = 1; | |||
while (true) | |||
{ | |||
stopwatch.Restart(); | |||
var sentMessagesCount = 0; | |||
while (stopwatch.ElapsedMilliseconds < 1000) | |||
{ | |||
await client.PublishAsync(message).ConfigureAwait(false); | |||
sentMessagesCount++; | |||
} | |||
Console.WriteLine($"Sent {sentMessagesCount} messages in iteration #" + iteration); | |||
iteration++; | |||
} | |||
} | |||
catch (Exception exception) | |||
{ | |||
Console.WriteLine(exception); | |||
} | |||
} | |||
} | |||
} |
@@ -25,6 +25,9 @@ namespace MQTTnet.TestApp.NetCore | |||
Console.WriteLine("7 = Client flow test"); | |||
Console.WriteLine("8 = Start performance test (client only)"); | |||
Console.WriteLine("9 = Start server (no trace)"); | |||
Console.WriteLine("a = Start QoS 2 benchmark"); | |||
Console.WriteLine("b = Start QoS 1 benchmark"); | |||
Console.WriteLine("c = Start QoS 0 benchmark"); | |||
var pressedKey = Console.ReadKey(true); | |||
if (pressedKey.KeyChar == '1') | |||
@@ -66,7 +69,19 @@ namespace MQTTnet.TestApp.NetCore | |||
ServerTest.RunEmptyServer(); | |||
return; | |||
} | |||
else if (pressedKey.KeyChar == 'a') | |||
{ | |||
Task.Run(PerformanceTest.RunQoS2Test); | |||
} | |||
else if (pressedKey.KeyChar == 'b') | |||
{ | |||
Task.Run(PerformanceTest.RunQoS1Test); | |||
} | |||
else if (pressedKey.KeyChar == 'c') | |||
{ | |||
Task.Run(PerformanceTest.RunQoS0Test); | |||
} | |||
Thread.Sleep(Timeout.Infinite); | |||
} | |||