You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

PerformanceTest.cs 8.0 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. using Microsoft.Extensions.DependencyInjection;
  2. using Microsoft.Extensions.Logging;
  3. using MQTTnet.Core;
  4. using MQTTnet.Core.Client;
  5. using MQTTnet.Core.Protocol;
  6. using MQTTnet.Core.Server;
  7. using System;
  8. using System.Collections.Generic;
  9. using System.Diagnostics;
  10. using System.Linq;
  11. using System.Text;
  12. using System.Threading;
  13. using System.Threading.Tasks;
  14. namespace MQTTnet.TestApp.NetCore
  15. {
  16. public static class PerformanceTest
  17. {
  18. public static async Task RunAsync()
  19. {
  20. var services = new ServiceCollection()
  21. .AddMqttServer(options =>
  22. {
  23. options.ConnectionValidator = p =>
  24. {
  25. if (p.ClientId == "SpecialClient")
  26. {
  27. if (p.Username != "USER" || p.Password != "PASS")
  28. {
  29. return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  30. }
  31. }
  32. return MqttConnectReturnCode.ConnectionAccepted;
  33. };
  34. options.DefaultCommunicationTimeout = TimeSpan.FromMinutes(10);
  35. })
  36. .AddMqttClient()
  37. .AddLogging()
  38. .BuildServiceProvider();
  39. //services.GetService<ILoggerFactory>().AddConsole(LogLevel.Warning, true);
  40. Console.WriteLine("Press 'c' for concurrent sends. Otherwise in one batch.");
  41. var concurrent = Console.ReadKey(true).KeyChar == 'c';
  42. var server = Task.Factory.StartNew(() => RunServerAsync(services), TaskCreationOptions.LongRunning);
  43. var client = Task.Factory.StartNew(() => RunClientAsync(2000, TimeSpan.FromMilliseconds(10), services, concurrent), TaskCreationOptions.LongRunning);
  44. await Task.WhenAll(server, client).ConfigureAwait(false);
  45. }
  46. private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval, IServiceProvider serviceProvider, bool concurrent)
  47. {
  48. return Task.WhenAll(Enumerable.Range(0, 3).Select(i => Task.Run(() => RunClientAsync(msgChunkSize, interval, serviceProvider, concurrent))));
  49. }
  50. private static async Task RunClientAsync(int msgChunkSize, TimeSpan interval, IServiceProvider serviceProvider, bool concurrent)
  51. {
  52. try
  53. {
  54. var options = new MqttClientOptions
  55. {
  56. ChannelOptions = new MqttClientTcpOptions { Server = "localhost" },
  57. ClientId = "Client1",
  58. CleanSession = true,
  59. CommunicationTimeout = TimeSpan.FromMinutes(10)
  60. };
  61. var client = serviceProvider.GetRequiredService<IMqttClient>();
  62. client.Connected += async (s, e) =>
  63. {
  64. Console.WriteLine("### CONNECTED WITH SERVER ###");
  65. await client.SubscribeAsync(new List<TopicFilter>
  66. {
  67. new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)
  68. });
  69. Console.WriteLine("### SUBSCRIBED ###");
  70. };
  71. client.Disconnected += async (s, e) =>
  72. {
  73. Console.WriteLine("### DISCONNECTED FROM SERVER ###");
  74. await Task.Delay(TimeSpan.FromSeconds(5));
  75. try
  76. {
  77. await client.ConnectAsync(options);
  78. }
  79. catch
  80. {
  81. Console.WriteLine("### RECONNECTING FAILED ###");
  82. }
  83. };
  84. try
  85. {
  86. await client.ConnectAsync(options);
  87. }
  88. catch (Exception exception)
  89. {
  90. Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception);
  91. }
  92. Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###");
  93. var testMessageCount = 10000;
  94. var message = CreateMessage();
  95. var stopwatch = Stopwatch.StartNew();
  96. for (var i = 0; i < testMessageCount; i++)
  97. {
  98. await client.PublishAsync(message);
  99. }
  100. stopwatch.Stop();
  101. Console.WriteLine($"Sent 10.000 messages within {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / (float)testMessageCount} ms / message).");
  102. var messages = new[] { message };
  103. var sentMessagesCount = 0;
  104. stopwatch.Restart();
  105. while (stopwatch.ElapsedMilliseconds < 1000)
  106. {
  107. await client.PublishAsync(messages).ConfigureAwait(false);
  108. sentMessagesCount++;
  109. }
  110. Console.WriteLine($"Sending {sentMessagesCount} messages per second.");
  111. var last = DateTime.Now;
  112. var msgCount = 0;
  113. while (true)
  114. {
  115. var msgs = Enumerable.Range(0, msgChunkSize)
  116. .Select(i => CreateMessage())
  117. .ToList();
  118. if (concurrent)
  119. {
  120. //send concurrent (test for raceconditions)
  121. var sendTasks = msgs
  122. .Select(msg => PublishSingleMessage(client, msg, ref msgCount))
  123. .ToList();
  124. await Task.WhenAll(sendTasks);
  125. }
  126. else
  127. {
  128. await client.PublishAsync(msgs);
  129. msgCount += msgs.Count;
  130. //send multiple
  131. }
  132. var now = DateTime.Now;
  133. if (last < now - TimeSpan.FromSeconds(1))
  134. {
  135. Console.WriteLine($"sending {msgCount} intended {msgChunkSize / interval.TotalSeconds}");
  136. msgCount = 0;
  137. last = now;
  138. }
  139. await Task.Delay(interval).ConfigureAwait(false);
  140. }
  141. }
  142. catch (Exception exception)
  143. {
  144. Console.WriteLine(exception);
  145. }
  146. }
  147. private static MqttApplicationMessage CreateMessage()
  148. {
  149. return new MqttApplicationMessage
  150. {
  151. Topic = "A/B/C",
  152. Payload = Encoding.UTF8.GetBytes("Hello World"),
  153. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
  154. };
  155. }
  156. private static Task PublishSingleMessage(IMqttClient client, MqttApplicationMessage applicationMessage, ref int count)
  157. {
  158. Interlocked.Increment(ref count);
  159. return Task.Run(() => client.PublishAsync(applicationMessage));
  160. }
  161. private static async Task RunServerAsync(IServiceProvider serviceProvider)
  162. {
  163. try
  164. {
  165. var mqttServer = serviceProvider.GetRequiredService<IMqttServer>();
  166. var msgs = 0;
  167. var stopwatch = Stopwatch.StartNew();
  168. mqttServer.ApplicationMessageReceived += (sender, args) =>
  169. {
  170. msgs++;
  171. if (stopwatch.ElapsedMilliseconds > 1000)
  172. {
  173. Console.WriteLine($"received {msgs}");
  174. msgs = 0;
  175. stopwatch.Restart();
  176. }
  177. };
  178. await mqttServer.StartAsync();
  179. Console.WriteLine("Press any key to exit.");
  180. Console.ReadLine();
  181. await mqttServer.StopAsync();
  182. }
  183. catch (Exception e)
  184. {
  185. Console.WriteLine(e);
  186. }
  187. Console.ReadLine();
  188. }
  189. }
  190. }