You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

156 lines
5.3 KiB

  1. using System;
  2. using System.Diagnostics;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Client;
  8. using MQTTnet.Protocol;
  9. using MQTTnet.Server;
  10. namespace MQTTnet.TestApp.NetCore
  11. {
  12. public static class PerformanceTest
  13. {
  14. public static void Run()
  15. {
  16. try
  17. {
  18. var mqttServer = new MqttFactory().CreateMqttServer();
  19. mqttServer.StartAsync(new MqttServerOptions()).GetAwaiter().GetResult();
  20. var options = new MqttClientOptions
  21. {
  22. ChannelOptions = new MqttClientTcpOptions
  23. {
  24. Server = "127.0.0.1"
  25. },
  26. CleanSession = true
  27. };
  28. var client = new MqttFactory().CreateMqttClient();
  29. client.ConnectAsync(options).GetAwaiter().GetResult();
  30. var message = CreateMessage();
  31. var stopwatch = new Stopwatch();
  32. for (var i = 0; i < 10; i++)
  33. {
  34. stopwatch.Restart();
  35. var sentMessagesCount = 0;
  36. while (stopwatch.ElapsedMilliseconds < 1000)
  37. {
  38. client.PublishAsync(message).GetAwaiter().GetResult();
  39. sentMessagesCount++;
  40. }
  41. Console.WriteLine($"Sending {sentMessagesCount} messages per second. #" + (i + 1));
  42. }
  43. }
  44. catch (Exception exception)
  45. {
  46. Console.WriteLine(exception);
  47. }
  48. }
  49. private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval, bool concurrent)
  50. {
  51. return Task.WhenAll(Enumerable.Range(0, 3).Select(i => Task.Run(() => RunClientAsync(msgChunkSize, interval, concurrent))));
  52. }
  53. private static async Task RunClientAsync(int msgChunkSize, TimeSpan interval, bool concurrent)
  54. {
  55. try
  56. {
  57. var options = new MqttClientOptions
  58. {
  59. ChannelOptions = new MqttClientTcpOptions { Server = "localhost" },
  60. ClientId = "Client1",
  61. CleanSession = true,
  62. CommunicationTimeout = TimeSpan.FromMinutes(10)
  63. };
  64. var client = new MqttFactory().CreateMqttClient();
  65. try
  66. {
  67. await client.ConnectAsync(options).ConfigureAwait(false);
  68. }
  69. catch (Exception exception)
  70. {
  71. Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception);
  72. }
  73. var message = CreateMessage();
  74. var stopwatch = Stopwatch.StartNew();
  75. var testMessageCount = 10000;
  76. for (var i = 0; i < testMessageCount; i++)
  77. {
  78. await client.PublishAsync(message);
  79. }
  80. stopwatch.Stop();
  81. Console.WriteLine($"Sent 10.000 messages within {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / (float)testMessageCount} ms / message).");
  82. var last = DateTime.Now;
  83. var msgCount = 0;
  84. while (true)
  85. {
  86. var msgs = Enumerable.Range(0, msgChunkSize)
  87. .Select(i => CreateMessage())
  88. .ToList();
  89. if (concurrent)
  90. {
  91. //send concurrent (test for raceconditions)
  92. var sendTasks = msgs
  93. .Select(msg => PublishSingleMessage(client, msg, ref msgCount))
  94. .ToList();
  95. await Task.WhenAll(sendTasks);
  96. }
  97. else
  98. {
  99. await client.PublishAsync(msgs);
  100. msgCount += msgs.Count;
  101. //send multiple
  102. }
  103. var now = DateTime.Now;
  104. if (last < now - TimeSpan.FromSeconds(1))
  105. {
  106. Console.WriteLine($"sending {msgCount} intended {msgChunkSize / interval.TotalSeconds}");
  107. msgCount = 0;
  108. last = now;
  109. }
  110. await Task.Delay(interval).ConfigureAwait(false);
  111. }
  112. }
  113. catch (Exception exception)
  114. {
  115. Console.WriteLine(exception);
  116. }
  117. }
  118. private static MqttApplicationMessage CreateMessage()
  119. {
  120. return new MqttApplicationMessage
  121. {
  122. Topic = "A/B/C",
  123. Payload = Encoding.UTF8.GetBytes("Hello World"),
  124. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
  125. };
  126. }
  127. private static Task PublishSingleMessage(IMqttClient client, MqttApplicationMessage applicationMessage, ref int count)
  128. {
  129. Interlocked.Increment(ref count);
  130. return Task.Run(() => client.PublishAsync(applicationMessage));
  131. }
  132. }
  133. }