You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

PerformanceTest.cs 11 KiB

7 years ago
7 years ago
6 years ago
7 years ago
7 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. using System;
  2. using System.Diagnostics;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Client;
  8. using MQTTnet.Client.Options;
  9. using MQTTnet.Protocol;
  10. using MQTTnet.Server;
  11. namespace MQTTnet.TestApp.NetCore
  12. {
  13. public static class PerformanceTest
  14. {
  15. public static void RunClientOnly()
  16. {
  17. try
  18. {
  19. var options = new MqttClientOptions
  20. {
  21. ChannelOptions = new MqttClientTcpOptions
  22. {
  23. Server = "127.0.0.1"
  24. },
  25. CleanSession = true
  26. };
  27. var client = new MqttFactory().CreateMqttClient();
  28. client.ConnectAsync(options).GetAwaiter().GetResult();
  29. var message = CreateMessage();
  30. var stopwatch = new Stopwatch();
  31. for (var i = 0; i < 10; i++)
  32. {
  33. var sentMessagesCount = 0;
  34. stopwatch.Restart();
  35. while (stopwatch.ElapsedMilliseconds < 1000)
  36. {
  37. client.PublishAsync(message).GetAwaiter().GetResult();
  38. sentMessagesCount++;
  39. }
  40. Console.WriteLine($"Sending {sentMessagesCount} messages per second. #" + (i + 1));
  41. GC.Collect();
  42. }
  43. }
  44. catch (Exception exception)
  45. {
  46. Console.WriteLine(exception);
  47. }
  48. }
  49. public static void RunClientAndServer()
  50. {
  51. try
  52. {
  53. var mqttServer = new MqttFactory().CreateMqttServer();
  54. mqttServer.StartAsync(new MqttServerOptions()).GetAwaiter().GetResult();
  55. var options = new MqttClientOptions
  56. {
  57. ChannelOptions = new MqttClientTcpOptions
  58. {
  59. Server = "127.0.0.1"
  60. },
  61. CleanSession = true
  62. };
  63. var client = new MqttFactory().CreateMqttClient();
  64. client.ConnectAsync(options).GetAwaiter().GetResult();
  65. var message = CreateMessage();
  66. var stopwatch = new Stopwatch();
  67. for (var i = 0; i < 10; i++)
  68. {
  69. stopwatch.Restart();
  70. var sentMessagesCount = 0;
  71. while (stopwatch.ElapsedMilliseconds < 1000)
  72. {
  73. client.PublishAsync(message).GetAwaiter().GetResult();
  74. sentMessagesCount++;
  75. }
  76. Console.WriteLine($"Sending {sentMessagesCount} messages per second. #" + (i + 1));
  77. }
  78. }
  79. catch (Exception exception)
  80. {
  81. Console.WriteLine(exception);
  82. }
  83. }
  84. private static Task RunClientsAsync(int msgChunkSize, TimeSpan interval, bool concurrent)
  85. {
  86. return Task.WhenAll(Enumerable.Range(0, 3).Select(i => Task.Run(() => RunClientAsync(msgChunkSize, interval, concurrent))));
  87. }
  88. private static async Task RunClientAsync(int msgChunkSize, TimeSpan interval, bool concurrent)
  89. {
  90. try
  91. {
  92. var options = new MqttClientOptions
  93. {
  94. ChannelOptions = new MqttClientTcpOptions { Server = "localhost" },
  95. ClientId = "Client1",
  96. CleanSession = true,
  97. CommunicationTimeout = TimeSpan.FromMinutes(10)
  98. };
  99. var client = new MqttFactory().CreateMqttClient();
  100. try
  101. {
  102. await client.ConnectAsync(options).ConfigureAwait(false);
  103. }
  104. catch (Exception exception)
  105. {
  106. Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception);
  107. }
  108. var message = CreateMessage();
  109. var stopwatch = Stopwatch.StartNew();
  110. var testMessageCount = 10000;
  111. for (var i = 0; i < testMessageCount; i++)
  112. {
  113. await client.PublishAsync(message);
  114. }
  115. stopwatch.Stop();
  116. Console.WriteLine($"Sent 10.000 messages within {stopwatch.ElapsedMilliseconds} ms ({stopwatch.ElapsedMilliseconds / (float)testMessageCount} ms / message).");
  117. var last = DateTime.Now;
  118. var msgCount = 0;
  119. while (true)
  120. {
  121. var msgs = Enumerable.Range(0, msgChunkSize)
  122. .Select(i => CreateMessage())
  123. .ToList();
  124. if (concurrent)
  125. {
  126. //send concurrent (test for raceconditions)
  127. var sendTasks = msgs
  128. .Select(msg => PublishSingleMessage(client, msg, ref msgCount))
  129. .ToList();
  130. await Task.WhenAll(sendTasks);
  131. }
  132. else
  133. {
  134. await client.PublishAsync(msgs);
  135. msgCount += msgs.Count;
  136. //send multiple
  137. }
  138. var now = DateTime.Now;
  139. if (last < now - TimeSpan.FromSeconds(1))
  140. {
  141. Console.WriteLine($"sending {msgCount} intended {msgChunkSize / interval.TotalSeconds}");
  142. msgCount = 0;
  143. last = now;
  144. }
  145. await Task.Delay(interval).ConfigureAwait(false);
  146. }
  147. }
  148. catch (Exception exception)
  149. {
  150. Console.WriteLine(exception);
  151. }
  152. }
  153. private static MqttApplicationMessage CreateMessage()
  154. {
  155. return new MqttApplicationMessage
  156. {
  157. Topic = "A/B/C",
  158. Payload = Encoding.UTF8.GetBytes("Hello World"),
  159. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
  160. };
  161. }
  162. private static Task PublishSingleMessage(IMqttClient client, MqttApplicationMessage applicationMessage, ref int count)
  163. {
  164. Interlocked.Increment(ref count);
  165. return Task.Run(() => client.PublishAsync(applicationMessage));
  166. }
  167. public static async Task RunQoS2Test()
  168. {
  169. try
  170. {
  171. var mqttServer = new MqttFactory().CreateMqttServer();
  172. await mqttServer.StartAsync(new MqttServerOptions());
  173. var options = new MqttClientOptions
  174. {
  175. ChannelOptions = new MqttClientTcpOptions
  176. {
  177. Server = "127.0.0.1"
  178. },
  179. CleanSession = true
  180. };
  181. var client = new MqttFactory().CreateMqttClient();
  182. await client.ConnectAsync(options);
  183. var message = new MqttApplicationMessage
  184. {
  185. Topic = "A/B/C",
  186. Payload = Encoding.UTF8.GetBytes("Hello World"),
  187. QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce
  188. };
  189. var stopwatch = new Stopwatch();
  190. var iteration = 1;
  191. while (true)
  192. {
  193. stopwatch.Restart();
  194. var sentMessagesCount = 0;
  195. while (stopwatch.ElapsedMilliseconds < 1000)
  196. {
  197. await client.PublishAsync(message).ConfigureAwait(false);
  198. sentMessagesCount++;
  199. }
  200. Console.WriteLine($"Sent {sentMessagesCount} messages in iteration #" + iteration);
  201. iteration++;
  202. }
  203. }
  204. catch (Exception exception)
  205. {
  206. Console.WriteLine(exception);
  207. }
  208. }
  209. public static async Task RunQoS1Test()
  210. {
  211. try
  212. {
  213. var mqttServer = new MqttFactory().CreateMqttServer();
  214. await mqttServer.StartAsync(new MqttServerOptions());
  215. var options = new MqttClientOptions
  216. {
  217. ChannelOptions = new MqttClientTcpOptions
  218. {
  219. Server = "127.0.0.1"
  220. },
  221. CleanSession = true
  222. };
  223. var client = new MqttFactory().CreateMqttClient();
  224. await client.ConnectAsync(options);
  225. var message = new MqttApplicationMessage
  226. {
  227. Topic = "A/B/C",
  228. Payload = Encoding.UTF8.GetBytes("Hello World"),
  229. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce
  230. };
  231. var stopwatch = new Stopwatch();
  232. var iteration = 1;
  233. while (true)
  234. {
  235. stopwatch.Restart();
  236. var sentMessagesCount = 0;
  237. while (stopwatch.ElapsedMilliseconds < 1000)
  238. {
  239. await client.PublishAsync(message).ConfigureAwait(false);
  240. sentMessagesCount++;
  241. }
  242. Console.WriteLine($"Sent {sentMessagesCount} messages in iteration #" + iteration);
  243. iteration++;
  244. }
  245. }
  246. catch (Exception exception)
  247. {
  248. Console.WriteLine(exception);
  249. }
  250. }
  251. public static async Task RunQoS0Test()
  252. {
  253. try
  254. {
  255. var mqttServer = new MqttFactory().CreateMqttServer();
  256. await mqttServer.StartAsync(new MqttServerOptions());
  257. var options = new MqttClientOptions
  258. {
  259. ChannelOptions = new MqttClientTcpOptions
  260. {
  261. Server = "127.0.0.1"
  262. },
  263. CleanSession = true
  264. };
  265. var client = new MqttFactory().CreateMqttClient();
  266. await client.ConnectAsync(options);
  267. var message = new MqttApplicationMessage
  268. {
  269. Topic = "A/B/C",
  270. Payload = Encoding.UTF8.GetBytes("Hello World"),
  271. QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
  272. };
  273. var stopwatch = new Stopwatch();
  274. var iteration = 1;
  275. while (true)
  276. {
  277. stopwatch.Restart();
  278. var sentMessagesCount = 0;
  279. while (stopwatch.ElapsedMilliseconds < 1000)
  280. {
  281. await client.PublishAsync(message).ConfigureAwait(false);
  282. sentMessagesCount++;
  283. }
  284. Console.WriteLine($"Sent {sentMessagesCount} messages in iteration #" + iteration);
  285. iteration++;
  286. }
  287. }
  288. catch (Exception exception)
  289. {
  290. Console.WriteLine(exception);
  291. }
  292. }
  293. }
  294. }