You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

592 lines
21 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.Sockets;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using Microsoft.VisualStudio.TestTools.UnitTesting;
  8. using MQTTnet.Client;
  9. using MQTTnet.Client.Connecting;
  10. using MQTTnet.Client.Disconnecting;
  11. using MQTTnet.Client.Options;
  12. using MQTTnet.Client.Subscribing;
  13. using MQTTnet.Exceptions;
  14. using MQTTnet.Protocol;
  15. using MQTTnet.Server;
  16. using MQTTnet.Tests.Mockups;
  17. namespace MQTTnet.Tests
  18. {
  19. [TestClass]
  20. public class Client_Tests
  21. {
  22. [TestMethod]
  23. public async Task Send_Reply_In_Message_Handler_For_Same_Client()
  24. {
  25. using (var testEnvironment = new TestEnvironment())
  26. {
  27. await testEnvironment.StartServerAsync();
  28. var client = await testEnvironment.ConnectClientAsync();
  29. await client.SubscribeAsync("#");
  30. var replyReceived = false;
  31. client.UseApplicationMessageReceivedHandler(c =>
  32. {
  33. if (c.ApplicationMessage.Topic == "request")
  34. {
  35. #pragma warning disable 4014
  36. Task.Run(() => client.PublishAsync("reply", null, MqttQualityOfServiceLevel.AtLeastOnce));
  37. #pragma warning restore 4014
  38. }
  39. else
  40. {
  41. replyReceived = true;
  42. }
  43. });
  44. await client.PublishAsync("request", null, MqttQualityOfServiceLevel.AtLeastOnce);
  45. SpinWait.SpinUntil(() => replyReceived, TimeSpan.FromSeconds(10));
  46. Assert.IsTrue(replyReceived);
  47. }
  48. }
  49. [TestMethod]
  50. public async Task Send_Reply_In_Message_Handler()
  51. {
  52. using (var testEnvironment = new TestEnvironment())
  53. {
  54. await testEnvironment.StartServerAsync();
  55. var client1 = await testEnvironment.ConnectClientAsync();
  56. var client2 = await testEnvironment.ConnectClientAsync();
  57. await client1.SubscribeAsync("#");
  58. await client2.SubscribeAsync("#");
  59. var replyReceived = false;
  60. client1.UseApplicationMessageReceivedHandler(c =>
  61. {
  62. if (c.ApplicationMessage.Topic == "reply")
  63. {
  64. replyReceived = true;
  65. }
  66. });
  67. client2.UseApplicationMessageReceivedHandler(async c =>{ await client2.PublishAsync("reply", null, MqttQualityOfServiceLevel.AtLeastOnce); });
  68. await client1.PublishAsync("request", null, MqttQualityOfServiceLevel.AtLeastOnce);
  69. SpinWait.SpinUntil(() => replyReceived, TimeSpan.FromSeconds(10));
  70. Assert.IsTrue(replyReceived);
  71. }
  72. }
  73. [TestMethod]
  74. public async Task Reconnect()
  75. {
  76. using (var testEnvironment = new TestEnvironment())
  77. {
  78. var server = await testEnvironment.StartServerAsync();
  79. var client = await testEnvironment.ConnectClientAsync();
  80. await Task.Delay(500);
  81. Assert.IsTrue(client.IsConnected);
  82. await server.StopAsync();
  83. await Task.Delay(500);
  84. Assert.IsFalse(client.IsConnected);
  85. await server.StartAsync(new MqttServerOptionsBuilder().WithDefaultEndpointPort(testEnvironment.ServerPort).Build());
  86. await Task.Delay(500);
  87. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build());
  88. Assert.IsTrue(client.IsConnected);
  89. }
  90. }
  91. [TestMethod]
  92. public async Task Reconnect_While_Server_Offline()
  93. {
  94. using (var testEnvironment = new TestEnvironment())
  95. {
  96. testEnvironment.IgnoreClientLogErrors = true;
  97. var server = await testEnvironment.StartServerAsync();
  98. var client = await testEnvironment.ConnectClientAsync();
  99. await Task.Delay(500);
  100. Assert.IsTrue(client.IsConnected);
  101. await server.StopAsync();
  102. await Task.Delay(500);
  103. Assert.IsFalse(client.IsConnected);
  104. for (var i = 0; i < 5; i++)
  105. {
  106. try
  107. {
  108. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build());
  109. Assert.Fail("Must fail!");
  110. }
  111. catch
  112. {
  113. }
  114. }
  115. await server.StartAsync(new MqttServerOptionsBuilder().WithDefaultEndpointPort(testEnvironment.ServerPort).Build());
  116. await Task.Delay(500);
  117. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build());
  118. Assert.IsTrue(client.IsConnected);
  119. }
  120. }
  121. [TestMethod]
  122. public async Task Reconnect_From_Disconnected_Event()
  123. {
  124. using (var testEnvironment = new TestEnvironment())
  125. {
  126. testEnvironment.IgnoreClientLogErrors = true;
  127. var client = testEnvironment.CreateClient();
  128. var tries = 0;
  129. var maxTries = 3;
  130. client.UseDisconnectedHandler(async e =>
  131. {
  132. if (tries >= maxTries)
  133. {
  134. return;
  135. }
  136. Interlocked.Increment(ref tries);
  137. await Task.Delay(100);
  138. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build());
  139. });
  140. try
  141. {
  142. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build());
  143. Assert.Fail("Must fail!");
  144. }
  145. catch
  146. {
  147. }
  148. SpinWait.SpinUntil(() => tries >= maxTries, 10000);
  149. Assert.AreEqual(maxTries, tries);
  150. }
  151. }
  152. [TestMethod]
  153. public async Task PacketIdentifier_In_Publish_Result()
  154. {
  155. using (var testEnvironment = new TestEnvironment())
  156. {
  157. await testEnvironment.StartServerAsync();
  158. var client = await testEnvironment.ConnectClientAsync();
  159. var result = await client.PublishAsync("a", "a", MqttQualityOfServiceLevel.AtMostOnce);
  160. Assert.AreEqual(null, result.PacketIdentifier);
  161. result = await client.PublishAsync("b", "b", MqttQualityOfServiceLevel.AtMostOnce);
  162. Assert.AreEqual(null, result.PacketIdentifier);
  163. result = await client.PublishAsync("a", "a", MqttQualityOfServiceLevel.AtLeastOnce);
  164. Assert.AreEqual((ushort)1, result.PacketIdentifier);
  165. result = await client.PublishAsync("b", "b", MqttQualityOfServiceLevel.AtLeastOnce);
  166. Assert.AreEqual((ushort)2, result.PacketIdentifier);
  167. result = await client.PublishAsync("a", "a", MqttQualityOfServiceLevel.ExactlyOnce);
  168. Assert.AreEqual((ushort)3, result.PacketIdentifier);
  169. result = await client.PublishAsync("b", "b", MqttQualityOfServiceLevel.ExactlyOnce);
  170. Assert.AreEqual((ushort)4, result.PacketIdentifier);
  171. }
  172. }
  173. [TestMethod]
  174. public async Task Invalid_Connect_Throws_Exception()
  175. {
  176. var factory = new MqttFactory();
  177. using (var client = factory.CreateMqttClient())
  178. {
  179. try
  180. {
  181. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("wrong-server").Build());
  182. Assert.Fail("Must fail!");
  183. }
  184. catch (Exception exception)
  185. {
  186. Assert.IsNotNull(exception);
  187. Assert.IsInstanceOfType(exception, typeof(MqttCommunicationException));
  188. Assert.IsInstanceOfType(exception.InnerException, typeof(SocketException));
  189. }
  190. }
  191. }
  192. [TestMethod]
  193. public async Task Fire_Disconnected_Event_On_Server_Shutdown()
  194. {
  195. using (var testEnvironment = new TestEnvironment())
  196. {
  197. var server = await testEnvironment.StartServerAsync();
  198. var client = await testEnvironment.ConnectClientAsync();
  199. var handlerFired = false;
  200. client.UseDisconnectedHandler(e => handlerFired = true);
  201. await server.StopAsync();
  202. await Task.Delay(4000);
  203. Assert.IsTrue(handlerFired);
  204. }
  205. }
  206. [TestMethod]
  207. public async Task Disconnect_Event_Contains_Exception()
  208. {
  209. var factory = new MqttFactory();
  210. using (var client = factory.CreateMqttClient())
  211. {
  212. Exception ex = null;
  213. client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e =>
  214. {
  215. ex = e.Exception;
  216. });
  217. try
  218. {
  219. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("wrong-server").Build());
  220. }
  221. catch
  222. {
  223. }
  224. await Task.Delay(500);
  225. Assert.IsNotNull(ex);
  226. Assert.IsInstanceOfType(ex, typeof(MqttCommunicationException));
  227. Assert.IsInstanceOfType(ex.InnerException, typeof(SocketException));
  228. }
  229. }
  230. [TestMethod]
  231. public async Task Preserve_Message_Order()
  232. {
  233. // The messages are sent in reverse or to ensure that the delay in the handler
  234. // needs longer for the first messages and later messages may be processed earlier (if there
  235. // is an issue).
  236. const int MessagesCount = 50;
  237. using (var testEnvironment = new TestEnvironment())
  238. {
  239. await testEnvironment.StartServerAsync();
  240. var client1 = await testEnvironment.ConnectClientAsync();
  241. await client1.SubscribeAsync("x");
  242. var receivedValues = new List<int>();
  243. async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs)
  244. {
  245. var value = int.Parse(eventArgs.ApplicationMessage.ConvertPayloadToString());
  246. await Task.Delay(value);
  247. lock (receivedValues)
  248. {
  249. receivedValues.Add(value);
  250. }
  251. }
  252. client1.UseApplicationMessageReceivedHandler(Handler1);
  253. var client2 = await testEnvironment.ConnectClientAsync();
  254. for (var i = MessagesCount; i > 0; i--)
  255. {
  256. await client2.PublishAsync("x", i.ToString());
  257. }
  258. await Task.Delay(5000);
  259. for (var i = MessagesCount; i > 0; i--)
  260. {
  261. Assert.AreEqual(i, receivedValues[MessagesCount - i]);
  262. }
  263. }
  264. }
  265. [TestMethod]
  266. public async Task Send_Reply_For_Any_Received_Message()
  267. {
  268. using (var testEnvironment = new TestEnvironment())
  269. {
  270. await testEnvironment.StartServerAsync();
  271. var client1 = await testEnvironment.ConnectClientAsync();
  272. await client1.SubscribeAsync("request/+");
  273. async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs)
  274. {
  275. await client1.PublishAsync($"reply/{eventArgs.ApplicationMessage.Topic}");
  276. }
  277. client1.UseApplicationMessageReceivedHandler(Handler1);
  278. var client2 = await testEnvironment.ConnectClientAsync();
  279. await client2.SubscribeAsync("reply/#");
  280. var replies = new List<string>();
  281. void Handler2(MqttApplicationMessageReceivedEventArgs eventArgs)
  282. {
  283. lock (replies)
  284. {
  285. replies.Add(eventArgs.ApplicationMessage.Topic);
  286. }
  287. }
  288. client2.UseApplicationMessageReceivedHandler((Action<MqttApplicationMessageReceivedEventArgs>)Handler2);
  289. await Task.Delay(500);
  290. await client2.PublishAsync("request/a");
  291. await client2.PublishAsync("request/b");
  292. await client2.PublishAsync("request/c");
  293. await Task.Delay(500);
  294. Assert.AreEqual("reply/request/a,reply/request/b,reply/request/c", string.Join(",", replies));
  295. }
  296. }
  297. [TestMethod]
  298. public async Task Publish_With_Correct_Retain_Flag()
  299. {
  300. using (var testEnvironment = new TestEnvironment())
  301. {
  302. await testEnvironment.StartServerAsync();
  303. var receivedMessages = new List<MqttApplicationMessage>();
  304. var client1 = await testEnvironment.ConnectClientAsync();
  305. client1.UseApplicationMessageReceivedHandler(c =>
  306. {
  307. lock (receivedMessages)
  308. {
  309. receivedMessages.Add(c.ApplicationMessage);
  310. }
  311. });
  312. await client1.SubscribeAsync("a");
  313. var client2 = await testEnvironment.ConnectClientAsync();
  314. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithRetainFlag().Build();
  315. await client2.PublishAsync(message);
  316. await Task.Delay(500);
  317. Assert.AreEqual(1, receivedMessages.Count);
  318. Assert.IsFalse(receivedMessages.First().Retain); // Must be false even if set above!
  319. }
  320. }
  321. [TestMethod]
  322. public async Task Subscribe_In_Callback_Events()
  323. {
  324. using (var testEnvironment = new TestEnvironment())
  325. {
  326. await testEnvironment.StartServerAsync();
  327. var receivedMessages = new List<MqttApplicationMessage>();
  328. var client = testEnvironment.CreateClient();
  329. client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e =>
  330. {
  331. await client.SubscribeAsync("RCU/P1/H0001/R0003");
  332. var msg = new MqttApplicationMessageBuilder()
  333. .WithPayload("DA|18RS00SC00XI0000RV00R100R200R300R400L100L200L300L400Y100Y200AC0102031800BELK0000BM0000|")
  334. .WithTopic("RCU/P1/H0001/R0003");
  335. await client.PublishAsync(msg.Build());
  336. });
  337. client.UseApplicationMessageReceivedHandler(c =>
  338. {
  339. lock (receivedMessages)
  340. {
  341. receivedMessages.Add(c.ApplicationMessage);
  342. }
  343. });
  344. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost", testEnvironment.ServerPort).Build());
  345. await Task.Delay(500);
  346. Assert.AreEqual(1, receivedMessages.Count);
  347. Assert.AreEqual("DA|18RS00SC00XI0000RV00R100R200R300R400L100L200L300L400Y100Y200AC0102031800BELK0000BM0000|", receivedMessages.First().ConvertPayloadToString());
  348. }
  349. }
  350. [TestMethod]
  351. public async Task Message_Send_Retry()
  352. {
  353. using (var testEnvironment = new TestEnvironment())
  354. {
  355. testEnvironment.IgnoreClientLogErrors = true;
  356. testEnvironment.IgnoreServerLogErrors = true;
  357. await testEnvironment.StartServerAsync(
  358. new MqttServerOptionsBuilder()
  359. .WithPersistentSessions()
  360. .WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(250)));
  361. var client1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithCleanSession(false));
  362. await client1.SubscribeAsync("x", MqttQualityOfServiceLevel.AtLeastOnce);
  363. var retries = 0;
  364. async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs)
  365. {
  366. retries++;
  367. await Task.Delay(1000);
  368. throw new Exception("Broken!");
  369. }
  370. client1.UseApplicationMessageReceivedHandler(Handler1);
  371. var client2 = await testEnvironment.ConnectClientAsync();
  372. await client2.PublishAsync("x");
  373. await Task.Delay(3000);
  374. // The server should disconnect clients which are not responding.
  375. Assert.IsFalse(client1.IsConnected);
  376. await client1.ReconnectAsync().ConfigureAwait(false);
  377. await Task.Delay(1000);
  378. Assert.AreEqual(2, retries);
  379. }
  380. }
  381. [TestMethod]
  382. public async Task NoConnectedHandler_Connect_DoesNotThrowException()
  383. {
  384. using (var testEnvironment = new TestEnvironment())
  385. {
  386. await testEnvironment.StartServerAsync();
  387. var client = await testEnvironment.ConnectClientAsync();
  388. Assert.IsTrue(client.IsConnected);
  389. }
  390. }
  391. [TestMethod]
  392. public async Task NoDisconnectedHandler_Disconnect_DoesNotThrowException()
  393. {
  394. using (var testEnvironment = new TestEnvironment())
  395. {
  396. await testEnvironment.StartServerAsync();
  397. var client = await testEnvironment.ConnectClientAsync();
  398. Assert.IsTrue(client.IsConnected);
  399. await client.DisconnectAsync();
  400. Assert.IsFalse(client.IsConnected);
  401. }
  402. }
  403. [TestMethod]
  404. public async Task Frequent_Connects()
  405. {
  406. using (var testEnvironment = new TestEnvironment())
  407. {
  408. await testEnvironment.StartServerAsync();
  409. var clients = new List<IMqttClient>();
  410. for (var i = 0; i < 100; i++)
  411. {
  412. clients.Add(await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a")));
  413. }
  414. var clientStatus = await testEnvironment.Server.GetClientStatusAsync();
  415. var sessionStatus = await testEnvironment.Server.GetSessionStatusAsync();
  416. for (var i = 0; i < 98; i++)
  417. {
  418. Assert.IsFalse(clients[i].IsConnected);
  419. }
  420. Assert.IsTrue(clients[99].IsConnected);
  421. Assert.AreEqual(1, clientStatus.Count);
  422. Assert.AreEqual(1, sessionStatus.Count);
  423. var receiveClient = clients[99];
  424. object receivedPayload = null;
  425. receiveClient.UseApplicationMessageReceivedHandler(e =>
  426. {
  427. receivedPayload = e.ApplicationMessage.ConvertPayloadToString();
  428. });
  429. await receiveClient.SubscribeAsync("x");
  430. var sendClient = await testEnvironment.ConnectClientAsync();
  431. await sendClient.PublishAsync("x", "1");
  432. await Task.Delay(100);
  433. Assert.AreEqual("1", receivedPayload);
  434. }
  435. }
  436. [TestMethod]
  437. public async Task No_Payload()
  438. {
  439. using (var testEnvironment = new TestEnvironment())
  440. {
  441. await testEnvironment.StartServerAsync();
  442. var sender = await testEnvironment.ConnectClientAsync();
  443. var receiver = await testEnvironment.ConnectClientAsync();
  444. var message = new MqttApplicationMessageBuilder()
  445. .WithTopic("A");
  446. await receiver.SubscribeAsync(new MqttClientSubscribeOptions
  447. {
  448. TopicFilters = new List<TopicFilter> { new TopicFilter { Topic = "#" } }
  449. }, CancellationToken.None);
  450. MqttApplicationMessage receivedMessage = null;
  451. receiver.UseApplicationMessageReceivedHandler(e => receivedMessage = e.ApplicationMessage);
  452. await sender.PublishAsync(message.Build(), CancellationToken.None);
  453. await Task.Delay(1000);
  454. Assert.IsNotNull(receivedMessage);
  455. Assert.AreEqual("A", receivedMessage.Topic);
  456. Assert.AreEqual(null, receivedMessage.Payload);
  457. }
  458. }
  459. }
  460. }