You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

MqttClient_Tests.cs 13 KiB

5 years ago
5 years ago
5 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.Sockets;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using Microsoft.VisualStudio.TestTools.UnitTesting;
  8. using MQTTnet.Client;
  9. using MQTTnet.Client.Connecting;
  10. using MQTTnet.Client.Disconnecting;
  11. using MQTTnet.Client.Options;
  12. using MQTTnet.Client.Subscribing;
  13. using MQTTnet.Exceptions;
  14. using MQTTnet.Formatter;
  15. using MQTTnet.Protocol;
  16. using MQTTnet.Server;
  17. using MQTTnet.Tests.Mockups;
  18. namespace MQTTnet.Tests
  19. {
  20. [TestClass]
  21. public class Client_Tests
  22. {
  23. [TestMethod]
  24. public async Task Invalid_Connect_Throws_Exception()
  25. {
  26. var factory = new MqttFactory();
  27. using (var client = factory.CreateMqttClient())
  28. {
  29. try
  30. {
  31. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("wrong-server").Build());
  32. Assert.Fail("Must fail!");
  33. }
  34. catch (Exception exception)
  35. {
  36. Assert.IsNotNull(exception);
  37. Assert.IsInstanceOfType(exception, typeof(MqttCommunicationException));
  38. Assert.IsInstanceOfType(exception.InnerException, typeof(SocketException));
  39. }
  40. }
  41. }
  42. [TestMethod]
  43. public async Task Disconnect_Event_Contains_Exception()
  44. {
  45. var factory = new MqttFactory();
  46. using (var client = factory.CreateMqttClient())
  47. {
  48. Exception ex = null;
  49. client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e =>
  50. {
  51. ex = e.Exception;
  52. });
  53. try
  54. {
  55. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("wrong-server").Build());
  56. }
  57. catch
  58. {
  59. }
  60. Assert.IsNotNull(ex);
  61. Assert.IsInstanceOfType(ex, typeof(MqttCommunicationException));
  62. Assert.IsInstanceOfType(ex.InnerException, typeof(SocketException));
  63. }
  64. }
  65. [TestMethod]
  66. public async Task Preserve_Message_Order()
  67. {
  68. // The messages are sent in reverse or to ensure that the delay in the handler
  69. // needs longer for the first messages and later messages may be processed earlier (if there
  70. // is an issue).
  71. const int MessagesCount = 50;
  72. using (var testEnvironment = new TestEnvironment())
  73. {
  74. await testEnvironment.StartServerAsync();
  75. var client1 = await testEnvironment.ConnectClientAsync();
  76. await client1.SubscribeAsync("x");
  77. var receivedValues = new List<int>();
  78. async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs)
  79. {
  80. var value = int.Parse(eventArgs.ApplicationMessage.ConvertPayloadToString());
  81. await Task.Delay(value);
  82. lock (receivedValues)
  83. {
  84. receivedValues.Add(value);
  85. }
  86. }
  87. client1.UseApplicationMessageReceivedHandler(Handler1);
  88. var client2 = await testEnvironment.ConnectClientAsync();
  89. for (var i = MessagesCount; i > 0; i--)
  90. {
  91. await client2.PublishAsync("x", i.ToString());
  92. }
  93. await Task.Delay(5000);
  94. for (var i = MessagesCount; i > 0; i--)
  95. {
  96. Assert.AreEqual(i, receivedValues[MessagesCount - i]);
  97. }
  98. }
  99. }
  100. [TestMethod]
  101. public async Task Send_Reply_For_Any_Received_Message()
  102. {
  103. using (var testEnvironment = new TestEnvironment())
  104. {
  105. await testEnvironment.StartServerAsync();
  106. var client1 = await testEnvironment.ConnectClientAsync();
  107. await client1.SubscribeAsync("request/+");
  108. async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs)
  109. {
  110. await client1.PublishAsync($"reply/{eventArgs.ApplicationMessage.Topic}");
  111. }
  112. client1.UseApplicationMessageReceivedHandler(Handler1);
  113. var client2 = await testEnvironment.ConnectClientAsync();
  114. await client2.SubscribeAsync("reply/#");
  115. var replies = new List<string>();
  116. void Handler2(MqttApplicationMessageReceivedEventArgs eventArgs)
  117. {
  118. lock (replies)
  119. {
  120. replies.Add(eventArgs.ApplicationMessage.Topic);
  121. }
  122. }
  123. client2.UseApplicationMessageReceivedHandler((Action<MqttApplicationMessageReceivedEventArgs>)Handler2);
  124. await Task.Delay(500);
  125. await client2.PublishAsync("request/a");
  126. await client2.PublishAsync("request/b");
  127. await client2.PublishAsync("request/c");
  128. await Task.Delay(500);
  129. Assert.AreEqual("reply/request/a,reply/request/b,reply/request/c", string.Join(",", replies));
  130. }
  131. }
  132. [TestMethod]
  133. public async Task Publish_With_Correct_Retain_Flag()
  134. {
  135. using (var testEnvironment = new TestEnvironment())
  136. {
  137. await testEnvironment.StartServerAsync();
  138. var receivedMessages = new List<MqttApplicationMessage>();
  139. var client1 = await testEnvironment.ConnectClientAsync();
  140. client1.UseApplicationMessageReceivedHandler(c =>
  141. {
  142. lock (receivedMessages)
  143. {
  144. receivedMessages.Add(c.ApplicationMessage);
  145. }
  146. });
  147. await client1.SubscribeAsync("a");
  148. var client2 = await testEnvironment.ConnectClientAsync();
  149. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithRetainFlag().Build();
  150. await client2.PublishAsync(message);
  151. await Task.Delay(500);
  152. Assert.AreEqual(1, receivedMessages.Count);
  153. Assert.IsFalse(receivedMessages.First().Retain); // Must be false even if set above!
  154. }
  155. }
  156. [TestMethod]
  157. public async Task Subscribe_In_Callback_Events()
  158. {
  159. using (var testEnvironment = new TestEnvironment())
  160. {
  161. await testEnvironment.StartServerAsync();
  162. var receivedMessages = new List<MqttApplicationMessage>();
  163. var client = testEnvironment.CreateClient();
  164. client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e =>
  165. {
  166. await client.SubscribeAsync("RCU/P1/H0001/R0003");
  167. var msg = new MqttApplicationMessageBuilder()
  168. .WithPayload("DA|18RS00SC00XI0000RV00R100R200R300R400L100L200L300L400Y100Y200AC0102031800BELK0000BM0000|")
  169. .WithTopic("RCU/P1/H0001/R0003");
  170. await client.PublishAsync(msg.Build());
  171. });
  172. client.UseApplicationMessageReceivedHandler(c =>
  173. {
  174. lock (receivedMessages)
  175. {
  176. receivedMessages.Add(c.ApplicationMessage);
  177. }
  178. });
  179. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost", testEnvironment.ServerPort).Build());
  180. await Task.Delay(500);
  181. Assert.AreEqual(1, receivedMessages.Count);
  182. Assert.AreEqual("DA|18RS00SC00XI0000RV00R100R200R300R400L100L200L300L400Y100Y200AC0102031800BELK0000BM0000|", receivedMessages.First().ConvertPayloadToString());
  183. }
  184. }
  185. [TestMethod]
  186. public async Task Message_Send_Retry()
  187. {
  188. using (var testEnvironment = new TestEnvironment())
  189. {
  190. testEnvironment.IgnoreClientLogErrors = true;
  191. testEnvironment.IgnoreServerLogErrors = true;
  192. await testEnvironment.StartServerAsync(
  193. new MqttServerOptionsBuilder()
  194. .WithPersistentSessions()
  195. .WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(250)));
  196. var client1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithCleanSession(false));
  197. await client1.SubscribeAsync("x", MqttQualityOfServiceLevel.AtLeastOnce);
  198. var retries = 0;
  199. async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs)
  200. {
  201. retries++;
  202. await Task.Delay(1000);
  203. throw new Exception("Broken!");
  204. }
  205. client1.UseApplicationMessageReceivedHandler(Handler1);
  206. var client2 = await testEnvironment.ConnectClientAsync();
  207. await client2.PublishAsync("x");
  208. await Task.Delay(3000);
  209. // The server should disconnect clients which are not responding.
  210. Assert.IsFalse(client1.IsConnected);
  211. await client1.ReconnectAsync().ConfigureAwait(false);
  212. await Task.Delay(1000);
  213. Assert.AreEqual(2, retries);
  214. }
  215. }
  216. [TestMethod]
  217. public async Task NoConnectedHandler_Connect_DoesNotThrowException()
  218. {
  219. using (var testEnvironment = new TestEnvironment())
  220. {
  221. await testEnvironment.StartServerAsync();
  222. var client = await testEnvironment.ConnectClientAsync();
  223. Assert.IsTrue(client.IsConnected);
  224. }
  225. }
  226. [TestMethod]
  227. public async Task NoDisconnectedHandler_Disconnect_DoesNotThrowException()
  228. {
  229. using (var testEnvironment = new TestEnvironment())
  230. {
  231. await testEnvironment.StartServerAsync();
  232. var client = await testEnvironment.ConnectClientAsync();
  233. Assert.IsTrue(client.IsConnected);
  234. await client.DisconnectAsync();
  235. Assert.IsFalse(client.IsConnected);
  236. }
  237. }
  238. [TestMethod]
  239. public async Task Frequent_Connects()
  240. {
  241. using (var testEnvironment = new TestEnvironment())
  242. {
  243. await testEnvironment.StartServerAsync();
  244. var clients = new List<IMqttClient>();
  245. for (var i = 0; i < 100; i++)
  246. {
  247. clients.Add(await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a")));
  248. }
  249. var clientStatus = await testEnvironment.Server.GetClientStatusAsync();
  250. var sessionStatus = await testEnvironment.Server.GetSessionStatusAsync();
  251. for (var i = 0; i < 98; i++)
  252. {
  253. Assert.IsFalse(clients[i].IsConnected);
  254. }
  255. Assert.IsTrue(clients[99].IsConnected);
  256. Assert.AreEqual(1, clientStatus.Count);
  257. Assert.AreEqual(1, sessionStatus.Count);
  258. var receiveClient = clients[99];
  259. object receivedPayload = null;
  260. receiveClient.UseApplicationMessageReceivedHandler(e =>
  261. {
  262. receivedPayload = e.ApplicationMessage.ConvertPayloadToString();
  263. });
  264. await receiveClient.SubscribeAsync("x");
  265. var sendClient = await testEnvironment.ConnectClientAsync();
  266. await sendClient.PublishAsync("x", "1");
  267. await Task.Delay(100);
  268. Assert.AreEqual("1", receivedPayload);
  269. }
  270. }
  271. [TestMethod]
  272. public async Task No_Payload()
  273. {
  274. using (var testEnvironment = new TestEnvironment())
  275. {
  276. await testEnvironment.StartServerAsync();
  277. var sender = await testEnvironment.ConnectClientAsync();
  278. var receiver = await testEnvironment.ConnectClientAsync();
  279. var message = new MqttApplicationMessageBuilder()
  280. .WithTopic("A");
  281. await receiver.SubscribeAsync(new MqttClientSubscribeOptions
  282. {
  283. TopicFilters = new List<TopicFilter> { new TopicFilter { Topic = "#" } }
  284. }, CancellationToken.None);
  285. MqttApplicationMessage receivedMessage = null;
  286. receiver.UseApplicationMessageReceivedHandler(e => receivedMessage = e.ApplicationMessage);
  287. await sender.PublishAsync(message.Build(), CancellationToken.None);
  288. await Task.Delay(1000);
  289. Assert.IsNotNull(receivedMessage);
  290. Assert.AreEqual("A", receivedMessage.Topic);
  291. Assert.AreEqual(null, receivedMessage.Payload);
  292. }
  293. }
  294. }
  295. }