You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

746 lines
27 KiB

  1. using Microsoft.VisualStudio.TestTools.UnitTesting;
  2. using MQTTnet.Client;
  3. using MQTTnet.Client.Connecting;
  4. using MQTTnet.Client.Disconnecting;
  5. using MQTTnet.Client.Options;
  6. using MQTTnet.Client.Subscribing;
  7. using MQTTnet.Exceptions;
  8. using MQTTnet.Protocol;
  9. using MQTTnet.Server;
  10. using MQTTnet.Tests.Mockups;
  11. using System;
  12. using System.Collections.Generic;
  13. using System.Linq;
  14. using System.Net.Sockets;
  15. using System.Text;
  16. using System.Threading;
  17. using System.Threading.Tasks;
  18. namespace MQTTnet.Tests
  19. {
  20. [TestClass]
  21. public class Client_Tests
  22. {
  23. public TestContext TestContext { get; set; }
  24. [TestMethod]
  25. public async Task Set_ClientWasConnected_On_ServerDisconnect()
  26. {
  27. using (var testEnvironment = new TestEnvironment(TestContext))
  28. {
  29. var server = await testEnvironment.StartServerAsync();
  30. var client = await testEnvironment.ConnectClientAsync();
  31. Assert.IsTrue(client.IsConnected);
  32. client.UseDisconnectedHandler(e => Assert.IsTrue(e.ClientWasConnected));
  33. await server.StopAsync();
  34. await Task.Delay(4000);
  35. }
  36. }
  37. [TestMethod]
  38. public async Task Set_ClientWasConnected_On_ClientDisconnect()
  39. {
  40. using (var testEnvironment = new TestEnvironment(TestContext))
  41. {
  42. var server = await testEnvironment.StartServerAsync();
  43. var client = await testEnvironment.ConnectClientAsync();
  44. Assert.IsTrue(client.IsConnected);
  45. client.UseDisconnectedHandler(e => Assert.IsTrue(e.ClientWasConnected));
  46. await client.DisconnectAsync();
  47. await Task.Delay(200);
  48. }
  49. }
  50. [TestMethod]
  51. [ExpectedException(typeof(MqttCommunicationTimedOutException))]
  52. public async Task Connect_To_Invalid_Server_Wrong_IP()
  53. {
  54. var client = new MqttFactory().CreateMqttClient();
  55. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("1.2.3.4").WithCommunicationTimeout(TimeSpan.FromSeconds(2)).Build());
  56. }
  57. [TestMethod]
  58. [ExpectedException(typeof(MqttCommunicationException))]
  59. public async Task Connect_To_Invalid_Server_Port_Not_Opened()
  60. {
  61. var client = new MqttFactory().CreateMqttClient();
  62. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", 12345).WithCommunicationTimeout(TimeSpan.FromSeconds(5)).Build());
  63. }
  64. [TestMethod]
  65. [ExpectedException(typeof(MqttCommunicationException))]
  66. public async Task Connect_To_Invalid_Server_Wrong_Protocol()
  67. {
  68. var client = new MqttFactory().CreateMqttClient();
  69. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("http://127.0.0.1", 12345).WithCommunicationTimeout(TimeSpan.FromSeconds(2)).Build());
  70. }
  71. [TestMethod]
  72. public async Task Send_Manual_Ping()
  73. {
  74. using (var testEnvironment = new TestEnvironment(TestContext))
  75. {
  76. await testEnvironment.StartServerAsync();
  77. var client = await testEnvironment.ConnectClientAsync();
  78. await client.PingAsync(CancellationToken.None);
  79. }
  80. }
  81. [TestMethod]
  82. public async Task Send_Reply_In_Message_Handler_For_Same_Client()
  83. {
  84. using (var testEnvironment = new TestEnvironment(TestContext))
  85. {
  86. await testEnvironment.StartServerAsync();
  87. var client = await testEnvironment.ConnectClientAsync();
  88. await client.SubscribeAsync("#");
  89. var replyReceived = false;
  90. client.UseApplicationMessageReceivedHandler(c =>
  91. {
  92. if (c.ApplicationMessage.Topic == "request")
  93. {
  94. #pragma warning disable 4014
  95. Task.Run(() => client.PublishAsync("reply", null, MqttQualityOfServiceLevel.AtLeastOnce));
  96. #pragma warning restore 4014
  97. }
  98. else
  99. {
  100. replyReceived = true;
  101. }
  102. });
  103. await client.PublishAsync("request", null, MqttQualityOfServiceLevel.AtLeastOnce);
  104. SpinWait.SpinUntil(() => replyReceived, TimeSpan.FromSeconds(10));
  105. Assert.IsTrue(replyReceived);
  106. }
  107. }
  108. [TestMethod]
  109. public async Task Send_Reply_In_Message_Handler()
  110. {
  111. using (var testEnvironment = new TestEnvironment())
  112. {
  113. await testEnvironment.StartServerAsync();
  114. var client1 = await testEnvironment.ConnectClientAsync();
  115. var client2 = await testEnvironment.ConnectClientAsync();
  116. await client1.SubscribeAsync("#");
  117. await client2.SubscribeAsync("#");
  118. var replyReceived = false;
  119. client1.UseApplicationMessageReceivedHandler(c =>
  120. {
  121. if (c.ApplicationMessage.Topic == "reply")
  122. {
  123. replyReceived = true;
  124. }
  125. });
  126. client2.UseApplicationMessageReceivedHandler(async c =>
  127. {
  128. // Use AtMostOnce here because with QoS 1 or even QoS 2 the process waits for
  129. // the ACK etc. The problem is that the SpinUntil below only waits until the
  130. // flag is set. It does not wait until the client has sent the ACK
  131. await client2.PublishAsync("reply", null, MqttQualityOfServiceLevel.AtMostOnce);
  132. });
  133. await client1.PublishAsync("request", null, MqttQualityOfServiceLevel.AtLeastOnce);
  134. await Task.Delay(500);
  135. SpinWait.SpinUntil(() => replyReceived, TimeSpan.FromSeconds(10));
  136. Assert.IsTrue(replyReceived);
  137. }
  138. }
  139. [TestMethod]
  140. public async Task Reconnect()
  141. {
  142. using (var testEnvironment = new TestEnvironment(TestContext))
  143. {
  144. var server = await testEnvironment.StartServerAsync();
  145. var client = await testEnvironment.ConnectClientAsync();
  146. await Task.Delay(500);
  147. Assert.IsTrue(client.IsConnected);
  148. await server.StopAsync();
  149. await Task.Delay(500);
  150. Assert.IsFalse(client.IsConnected);
  151. await server.StartAsync(new MqttServerOptionsBuilder().WithDefaultEndpointPort(testEnvironment.ServerPort).Build());
  152. await Task.Delay(500);
  153. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build());
  154. Assert.IsTrue(client.IsConnected);
  155. }
  156. }
  157. [TestMethod]
  158. public async Task Reconnect_While_Server_Offline()
  159. {
  160. using (var testEnvironment = new TestEnvironment(TestContext))
  161. {
  162. testEnvironment.IgnoreClientLogErrors = true;
  163. var server = await testEnvironment.StartServerAsync();
  164. var client = await testEnvironment.ConnectClientAsync();
  165. await Task.Delay(500);
  166. Assert.IsTrue(client.IsConnected);
  167. await server.StopAsync();
  168. await Task.Delay(500);
  169. Assert.IsFalse(client.IsConnected);
  170. for (var i = 0; i < 5; i++)
  171. {
  172. try
  173. {
  174. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build());
  175. Assert.Fail("Must fail!");
  176. }
  177. catch
  178. {
  179. }
  180. }
  181. await server.StartAsync(new MqttServerOptionsBuilder().WithDefaultEndpointPort(testEnvironment.ServerPort).Build());
  182. await Task.Delay(500);
  183. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build());
  184. Assert.IsTrue(client.IsConnected);
  185. }
  186. }
  187. [TestMethod]
  188. public async Task Reconnect_From_Disconnected_Event()
  189. {
  190. using (var testEnvironment = new TestEnvironment(TestContext))
  191. {
  192. testEnvironment.IgnoreClientLogErrors = true;
  193. var client = testEnvironment.CreateClient();
  194. var tries = 0;
  195. var maxTries = 3;
  196. client.UseDisconnectedHandler(async e =>
  197. {
  198. if (tries >= maxTries)
  199. {
  200. return;
  201. }
  202. Interlocked.Increment(ref tries);
  203. await Task.Delay(100);
  204. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build());
  205. });
  206. try
  207. {
  208. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort).Build());
  209. Assert.Fail("Must fail!");
  210. }
  211. catch
  212. {
  213. }
  214. SpinWait.SpinUntil(() => tries >= maxTries, 10000);
  215. Assert.AreEqual(maxTries, tries);
  216. }
  217. }
  218. [TestMethod]
  219. public async Task PacketIdentifier_In_Publish_Result()
  220. {
  221. using (var testEnvironment = new TestEnvironment(TestContext))
  222. {
  223. await testEnvironment.StartServerAsync();
  224. var client = await testEnvironment.ConnectClientAsync();
  225. var result = await client.PublishAsync("a", "a", MqttQualityOfServiceLevel.AtMostOnce);
  226. Assert.AreEqual(null, result.PacketIdentifier);
  227. result = await client.PublishAsync("b", "b", MqttQualityOfServiceLevel.AtMostOnce);
  228. Assert.AreEqual(null, result.PacketIdentifier);
  229. result = await client.PublishAsync("a", "a", MqttQualityOfServiceLevel.AtLeastOnce);
  230. Assert.AreEqual((ushort)1, result.PacketIdentifier);
  231. result = await client.PublishAsync("b", "b", MqttQualityOfServiceLevel.AtLeastOnce);
  232. Assert.AreEqual((ushort)2, result.PacketIdentifier);
  233. result = await client.PublishAsync("a", "a", MqttQualityOfServiceLevel.ExactlyOnce);
  234. Assert.AreEqual((ushort)3, result.PacketIdentifier);
  235. result = await client.PublishAsync("b", "b", MqttQualityOfServiceLevel.ExactlyOnce);
  236. Assert.AreEqual((ushort)4, result.PacketIdentifier);
  237. }
  238. }
  239. [TestMethod]
  240. public async Task Invalid_Connect_Throws_Exception()
  241. {
  242. var factory = new MqttFactory();
  243. using (var client = factory.CreateMqttClient())
  244. {
  245. try
  246. {
  247. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("wrong-server").Build());
  248. Assert.Fail("Must fail!");
  249. }
  250. catch (Exception exception)
  251. {
  252. Assert.IsNotNull(exception);
  253. Assert.IsInstanceOfType(exception, typeof(MqttCommunicationException));
  254. Assert.IsInstanceOfType(exception.InnerException, typeof(SocketException));
  255. }
  256. }
  257. }
  258. [TestMethod]
  259. public async Task ConnectTimeout_Throws_Exception()
  260. {
  261. var factory = new MqttFactory();
  262. using (var client = factory.CreateMqttClient())
  263. {
  264. bool disconnectHandlerCalled = false;
  265. try
  266. {
  267. client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(args =>
  268. {
  269. disconnectHandlerCalled = true;
  270. });
  271. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("1.2.3.4").Build());
  272. Assert.Fail("Must fail!");
  273. }
  274. catch (Exception exception)
  275. {
  276. Assert.IsNotNull(exception);
  277. Assert.IsInstanceOfType(exception, typeof(MqttCommunicationException));
  278. //Assert.IsInstanceOfType(exception.InnerException, typeof(SocketException));
  279. }
  280. await Task.Delay(100); // disconnected handler is called async
  281. Assert.IsTrue(disconnectHandlerCalled);
  282. }
  283. }
  284. [TestMethod]
  285. public async Task Fire_Disconnected_Event_On_Server_Shutdown()
  286. {
  287. using (var testEnvironment = new TestEnvironment(TestContext))
  288. {
  289. var server = await testEnvironment.StartServerAsync();
  290. var client = await testEnvironment.ConnectClientAsync();
  291. var handlerFired = false;
  292. client.UseDisconnectedHandler(e => handlerFired = true);
  293. await server.StopAsync();
  294. await Task.Delay(4000);
  295. Assert.IsTrue(handlerFired);
  296. }
  297. }
  298. [TestMethod]
  299. public async Task Disconnect_Event_Contains_Exception()
  300. {
  301. var factory = new MqttFactory();
  302. using (var client = factory.CreateMqttClient())
  303. {
  304. Exception ex = null;
  305. client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e =>
  306. {
  307. ex = e.Exception;
  308. });
  309. try
  310. {
  311. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("wrong-server").Build());
  312. }
  313. catch
  314. {
  315. }
  316. await Task.Delay(500);
  317. Assert.IsNotNull(ex);
  318. Assert.IsInstanceOfType(ex, typeof(MqttCommunicationException));
  319. Assert.IsInstanceOfType(ex.InnerException, typeof(SocketException));
  320. }
  321. }
  322. [TestMethod]
  323. public async Task Preserve_Message_Order()
  324. {
  325. // The messages are sent in reverse or to ensure that the delay in the handler
  326. // needs longer for the first messages and later messages may be processed earlier (if there
  327. // is an issue).
  328. const int MessagesCount = 50;
  329. using (var testEnvironment = new TestEnvironment(TestContext))
  330. {
  331. await testEnvironment.StartServerAsync();
  332. var client1 = await testEnvironment.ConnectClientAsync();
  333. await client1.SubscribeAsync("x");
  334. var receivedValues = new List<int>();
  335. async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs)
  336. {
  337. var value = int.Parse(eventArgs.ApplicationMessage.ConvertPayloadToString());
  338. await Task.Delay(value);
  339. lock (receivedValues)
  340. {
  341. receivedValues.Add(value);
  342. }
  343. }
  344. client1.UseApplicationMessageReceivedHandler(Handler1);
  345. var client2 = await testEnvironment.ConnectClientAsync();
  346. for (var i = MessagesCount; i > 0; i--)
  347. {
  348. await client2.PublishAsync("x", i.ToString());
  349. }
  350. await Task.Delay(5000);
  351. for (var i = MessagesCount; i > 0; i--)
  352. {
  353. Assert.AreEqual(i, receivedValues[MessagesCount - i]);
  354. }
  355. }
  356. }
  357. [TestMethod]
  358. public async Task Send_Reply_For_Any_Received_Message()
  359. {
  360. using (var testEnvironment = new TestEnvironment(TestContext))
  361. {
  362. await testEnvironment.StartServerAsync();
  363. var client1 = await testEnvironment.ConnectClientAsync();
  364. await client1.SubscribeAsync("request/+");
  365. async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs)
  366. {
  367. await client1.PublishAsync($"reply/{eventArgs.ApplicationMessage.Topic}");
  368. }
  369. client1.UseApplicationMessageReceivedHandler(Handler1);
  370. var client2 = await testEnvironment.ConnectClientAsync();
  371. await client2.SubscribeAsync("reply/#");
  372. var replies = new List<string>();
  373. void Handler2(MqttApplicationMessageReceivedEventArgs eventArgs)
  374. {
  375. lock (replies)
  376. {
  377. replies.Add(eventArgs.ApplicationMessage.Topic);
  378. }
  379. }
  380. client2.UseApplicationMessageReceivedHandler((Action<MqttApplicationMessageReceivedEventArgs>)Handler2);
  381. await Task.Delay(500);
  382. await client2.PublishAsync("request/a");
  383. await client2.PublishAsync("request/b");
  384. await client2.PublishAsync("request/c");
  385. await Task.Delay(500);
  386. Assert.AreEqual("reply/request/a,reply/request/b,reply/request/c", string.Join(",", replies));
  387. }
  388. }
  389. [TestMethod]
  390. public async Task Publish_With_Correct_Retain_Flag()
  391. {
  392. using (var testEnvironment = new TestEnvironment(TestContext))
  393. {
  394. await testEnvironment.StartServerAsync();
  395. var receivedMessages = new List<MqttApplicationMessage>();
  396. var client1 = await testEnvironment.ConnectClientAsync();
  397. client1.UseApplicationMessageReceivedHandler(c =>
  398. {
  399. lock (receivedMessages)
  400. {
  401. receivedMessages.Add(c.ApplicationMessage);
  402. }
  403. });
  404. await client1.SubscribeAsync("a");
  405. var client2 = await testEnvironment.ConnectClientAsync();
  406. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithRetainFlag().Build();
  407. await client2.PublishAsync(message);
  408. await Task.Delay(500);
  409. Assert.AreEqual(1, receivedMessages.Count);
  410. Assert.IsFalse(receivedMessages.First().Retain); // Must be false even if set above!
  411. }
  412. }
  413. [TestMethod]
  414. public async Task Publish_QoS_1_In_ApplicationMessageReceiveHandler()
  415. {
  416. using (var testEnvironment = new TestEnvironment(TestContext))
  417. {
  418. await testEnvironment.StartServerAsync();
  419. const string client1Topic = "client1/topic";
  420. const string client2Topic = "client2/topic";
  421. const string expectedClient2Message = "hello client2";
  422. var client1 = await testEnvironment.ConnectClientAsync();
  423. client1.UseApplicationMessageReceivedHandler(async c =>
  424. {
  425. await client1.PublishAsync(client2Topic, expectedClient2Message, MqttQualityOfServiceLevel.AtLeastOnce);
  426. });
  427. await client1.SubscribeAsync(client1Topic, MqttQualityOfServiceLevel.AtLeastOnce);
  428. var client2 = await testEnvironment.ConnectClientAsync();
  429. var client2TopicResults = new List<string>();
  430. client2.UseApplicationMessageReceivedHandler(c =>
  431. {
  432. client2TopicResults.Add(Encoding.UTF8.GetString(c.ApplicationMessage.Payload));
  433. });
  434. await client2.SubscribeAsync(client2Topic);
  435. var client3 = await testEnvironment.ConnectClientAsync();
  436. var message = new MqttApplicationMessageBuilder().WithTopic(client1Topic).Build();
  437. await client3.PublishAsync(message);
  438. await client3.PublishAsync(message);
  439. await Task.Delay(500);
  440. Assert.AreEqual(2, client2TopicResults.Count);
  441. Assert.AreEqual(expectedClient2Message, client2TopicResults[0]);
  442. Assert.AreEqual(expectedClient2Message, client2TopicResults[1]);
  443. }
  444. }
  445. [TestMethod]
  446. public async Task Subscribe_In_Callback_Events()
  447. {
  448. using (var testEnvironment = new TestEnvironment(TestContext))
  449. {
  450. await testEnvironment.StartServerAsync();
  451. var receivedMessages = new List<MqttApplicationMessage>();
  452. var client = testEnvironment.CreateClient();
  453. client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e =>
  454. {
  455. await client.SubscribeAsync("RCU/P1/H0001/R0003");
  456. var msg = new MqttApplicationMessageBuilder()
  457. .WithPayload("DA|18RS00SC00XI0000RV00R100R200R300R400L100L200L300L400Y100Y200AC0102031800BELK0000BM0000|")
  458. .WithTopic("RCU/P1/H0001/R0003");
  459. await client.PublishAsync(msg.Build());
  460. });
  461. client.UseApplicationMessageReceivedHandler(c =>
  462. {
  463. lock (receivedMessages)
  464. {
  465. receivedMessages.Add(c.ApplicationMessage);
  466. }
  467. });
  468. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost", testEnvironment.ServerPort).Build());
  469. await Task.Delay(500);
  470. Assert.AreEqual(1, receivedMessages.Count);
  471. Assert.AreEqual("DA|18RS00SC00XI0000RV00R100R200R300R400L100L200L300L400Y100Y200AC0102031800BELK0000BM0000|", receivedMessages.First().ConvertPayloadToString());
  472. }
  473. }
  474. [TestMethod]
  475. public async Task Message_Send_Retry()
  476. {
  477. using (var testEnvironment = new TestEnvironment(TestContext))
  478. {
  479. testEnvironment.IgnoreClientLogErrors = true;
  480. testEnvironment.IgnoreServerLogErrors = true;
  481. await testEnvironment.StartServerAsync(
  482. new MqttServerOptionsBuilder()
  483. .WithPersistentSessions()
  484. .WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(250)));
  485. var client1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithCleanSession(false));
  486. await client1.SubscribeAsync("x", MqttQualityOfServiceLevel.AtLeastOnce);
  487. var retries = 0;
  488. async Task Handler1(MqttApplicationMessageReceivedEventArgs eventArgs)
  489. {
  490. retries++;
  491. await Task.Delay(1000);
  492. throw new Exception("Broken!");
  493. }
  494. client1.UseApplicationMessageReceivedHandler(Handler1);
  495. var client2 = await testEnvironment.ConnectClientAsync();
  496. await client2.PublishAsync("x");
  497. await Task.Delay(3000);
  498. // The server should disconnect clients which are not responding.
  499. Assert.IsFalse(client1.IsConnected);
  500. await client1.ReconnectAsync().ConfigureAwait(false);
  501. await Task.Delay(1000);
  502. Assert.AreEqual(2, retries);
  503. }
  504. }
  505. [TestMethod]
  506. public async Task NoConnectedHandler_Connect_DoesNotThrowException()
  507. {
  508. using (var testEnvironment = new TestEnvironment(TestContext))
  509. {
  510. await testEnvironment.StartServerAsync();
  511. var client = await testEnvironment.ConnectClientAsync();
  512. Assert.IsTrue(client.IsConnected);
  513. }
  514. }
  515. [TestMethod]
  516. public async Task NoDisconnectedHandler_Disconnect_DoesNotThrowException()
  517. {
  518. using (var testEnvironment = new TestEnvironment(TestContext))
  519. {
  520. await testEnvironment.StartServerAsync();
  521. var client = await testEnvironment.ConnectClientAsync();
  522. Assert.IsTrue(client.IsConnected);
  523. await client.DisconnectAsync();
  524. Assert.IsFalse(client.IsConnected);
  525. }
  526. }
  527. [TestMethod]
  528. public async Task Frequent_Connects()
  529. {
  530. using (var testEnvironment = new TestEnvironment(TestContext))
  531. {
  532. await testEnvironment.StartServerAsync();
  533. var clients = new List<IMqttClient>();
  534. for (var i = 0; i < 100; i++)
  535. {
  536. clients.Add(await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a")));
  537. }
  538. await Task.Delay(500);
  539. var clientStatus = await testEnvironment.Server.GetClientStatusAsync();
  540. var sessionStatus = await testEnvironment.Server.GetSessionStatusAsync();
  541. for (var i = 0; i < 98; i++)
  542. {
  543. Assert.IsFalse(clients[i].IsConnected, $"clients[{i}] is not connected");
  544. }
  545. Assert.IsTrue(clients[99].IsConnected);
  546. Assert.AreEqual(1, clientStatus.Count);
  547. Assert.AreEqual(1, sessionStatus.Count);
  548. var receiveClient = clients[99];
  549. object receivedPayload = null;
  550. receiveClient.UseApplicationMessageReceivedHandler(e =>
  551. {
  552. receivedPayload = e.ApplicationMessage.ConvertPayloadToString();
  553. });
  554. await receiveClient.SubscribeAsync("x");
  555. var sendClient = await testEnvironment.ConnectClientAsync();
  556. await sendClient.PublishAsync("x", "1");
  557. await Task.Delay(250);
  558. Assert.AreEqual("1", receivedPayload);
  559. }
  560. }
  561. [TestMethod]
  562. public async Task No_Payload()
  563. {
  564. using (var testEnvironment = new TestEnvironment(TestContext))
  565. {
  566. await testEnvironment.StartServerAsync();
  567. var sender = await testEnvironment.ConnectClientAsync();
  568. var receiver = await testEnvironment.ConnectClientAsync();
  569. var message = new MqttApplicationMessageBuilder()
  570. .WithTopic("A");
  571. await receiver.SubscribeAsync(new MqttClientSubscribeOptions
  572. {
  573. TopicFilters = new List<MqttTopicFilter> { new MqttTopicFilter { Topic = "#" } }
  574. }, CancellationToken.None);
  575. MqttApplicationMessage receivedMessage = null;
  576. receiver.UseApplicationMessageReceivedHandler(e => receivedMessage = e.ApplicationMessage);
  577. await sender.PublishAsync(message.Build(), CancellationToken.None);
  578. await Task.Delay(1000);
  579. Assert.IsNotNull(receivedMessage);
  580. Assert.AreEqual("A", receivedMessage.Topic);
  581. Assert.AreEqual(null, receivedMessage.Payload);
  582. }
  583. }
  584. }
  585. }