You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

PerformanceTest.cs 7.9 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago

  1. using Microsoft.Extensions.DependencyInjection;
  2. using Microsoft.Extensions.Logging;
  3. using MQTTnet.Core;
  4. using MQTTnet.Core.Client;
  5. using MQTTnet.Core.Protocol;
  6. using MQTTnet.Core.Server;
  7. using System;
  8. using System.Collections.Generic;
  9. using System.Diagnostics;
  10. using System.Linq;
  11. using System.Text;
  12. using System.Threading;
  13. using System.Threading.Tasks;
  14. namespace MQTTnet.TestApp.NetCore
  15. {
  16. public static class PerformanceTest
  17. {
  18. public static async Task RunAsync()
  19. {
  20. var services = new ServiceCollection()
  21. .AddMqttServer(options =>
  22. {
  23. options.ConnectionValidator = p =>
  24. {
  25. if (p.ClientId == "SpecialClient")
  26. {
  27. if (p.Username != "USER" || p.Password != "PASS")
  28. {
  29. return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  30. }
  31. }
  32. return MqttConnectReturnCode.ConnectionAccepted;
  33. };
  34. options.DefaultCommunicationTimeout = TimeSpan.FromMinutes(10);
  35. })
  36. .AddMqttClient()
  37. .AddLogging()
  38. .BuildServiceProvider();
  39. services.GetService<ILoggerFactory>().AddConsole(LogLevel.Warning, true);
  40. Console.WriteLine("Press 'c' for concurrent sends. Otherwise in one batch.");
  41. var concurrent = Console.ReadKey(true).KeyChar == 'c';
  42. var server = Task.Factory.StartNew(() => RunServerAsync(services), TaskCreationOptions.LongRunning);
  43. var client = Task.Factory.StartNew(() => RunClientAsync(2000, TimeSpan.FromMilliseconds(10), services, concurrent), TaskCreationOptions.LongRunning);
  44. await Task.WhenAll(server, client).ConfigureAwait(false);
  45. }
  46. private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval, IServiceProvider serviceProvider, bool concurrent)
  47. {
  48. return Task.WhenAll(Enumerable.Range(0, 3).Select(i => Task.Run(() => RunClientAsync(msgChunkSize, interval, serviceProvider, concurrent))));
  49. }
  50. private static async Task RunClientAsync(int msgChunkSize, TimeSpan interval, IServiceProvider serviceProvider, bool concurrent)
  51. {
  52. try
  53. {
  54. var options = new MqttClientOptions
  55. {
  56. ChannelOptions = new MqttClientTcpOptions { Server = "localhost" },
  57. ClientId = "Client1",
  58. CleanSession = true,
  59. CommunicationTimeout = TimeSpan.FromMinutes(10)
  60. };
  61. var client = serviceProvider.GetRequiredService<IMqttClient>();
  62. client.Connected += async (s, e) =>
  63. {
  64. Console.WriteLine("### CONNECTED WITH SERVER ###");
  65. await client.SubscribeAsync(new List<TopicFilter>
  66. {
  67. new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)
  68. });
  69. Console.WriteLine("### SUBSCRIBED ###");
  70. };
  71. client.Disconnected += async (s, e) =>
  72. {
  73. Console.WriteLine("### DISCONNECTED FROM SERVER ###");
  74. await Task.Delay(TimeSpan.FromSeconds(5));
  75. try
  76. {
  77. await client.ConnectAsync(options);
  78. }
  79. catch
  80. {
  81. Console.WriteLine("### RECONNECTING FAILED ###");
  82. }
  83. };
  84. try
  85. {
  86. await client.ConnectAsync(options);
  87. }
  88. catch (Exception exception)
  89. {
  90. Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception);
  91. }
  92. Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###");
  93. var testMessageCount = 10000;
  94. var message = CreateMessage();
  95. var stopwatch = Stopwatch.StartNew();
  96. for (var i = 0; i < testMessageCount; i++)
  97. {
  98. await client.PublishAsync(message);
  99. }
  100. stopwatch.Stop();
  101. Console.WriteLine($"Sent 10.000 messages within {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / (float)testMessageCount} ms / message).");
  102. stopwatch.Restart();
  103. var sentMessagesCount = 0;
  104. while (stopwatch.ElapsedMilliseconds < 1000)
  105. {
  106. await client.PublishAsync(message);
  107. sentMessagesCount++;
  108. }
  109. Console.WriteLine($"Sending {sentMessagesCount} messages per second.");
  110. var last = DateTime.Now;
  111. var msgCount = 0;
  112. while (true)
  113. {
  114. var msgs = Enumerable.Range(0, msgChunkSize)
  115. .Select(i => CreateMessage())
  116. .ToList();
  117. if (concurrent)
  118. {
  119. //send concurrent (test for raceconditions)
  120. var sendTasks = msgs
  121. .Select(msg => PublishSingleMessage(client, msg, ref msgCount))
  122. .ToList();
  123. await Task.WhenAll(sendTasks);
  124. }
  125. else
  126. {
  127. await client.PublishAsync(msgs);
  128. msgCount += msgs.Count;
  129. //send multiple
  130. }
  131. var now = DateTime.Now;
  132. if (last < now - TimeSpan.FromSeconds(1))
  133. {
  134. Console.WriteLine($"sending {msgCount} intended {msgChunkSize / interval.TotalSeconds}");
  135. msgCount = 0;
  136. last = now;
  137. }
  138. await Task.Delay(interval).ConfigureAwait(false);
  139. }
  140. }
  141. catch (Exception exception)
  142. {
  143. Console.WriteLine(exception);
  144. }
  145. }
  146. private static MqttApplicationMessage CreateMessage()
  147. {
  148. return new MqttApplicationMessage
  149. {
  150. Topic = "A/B/C",
  151. Payload = Encoding.UTF8.GetBytes("Hello World"),
  152. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce
  153. };
  154. }
  155. private static Task PublishSingleMessage(IMqttClient client, MqttApplicationMessage applicationMessage, ref int count)
  156. {
  157. Interlocked.Increment(ref count);
  158. return Task.Run(() => client.PublishAsync(applicationMessage));
  159. }
  160. private static async Task RunServerAsync(IServiceProvider serviceProvider)
  161. {
  162. try
  163. {
  164. var mqttServer = serviceProvider.GetRequiredService<IMqttServer>();
  165. var msgs = 0;
  166. var stopwatch = Stopwatch.StartNew();
  167. mqttServer.ApplicationMessageReceived += (sender, args) =>
  168. {
  169. msgs++;
  170. if (stopwatch.ElapsedMilliseconds > 1000)
  171. {
  172. Console.WriteLine($"received {msgs}");
  173. msgs = 0;
  174. stopwatch.Restart();
  175. }
  176. };
  177. await mqttServer.StartAsync();
  178. Console.WriteLine("Press any key to exit.");
  179. Console.ReadLine();
  180. await mqttServer.StopAsync();
  181. }
  182. catch (Exception e)
  183. {
  184. Console.WriteLine(e);
  185. }
  186. Console.ReadLine();
  187. }
  188. }
  189. }