You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1417 lines
52 KiB

  1. using Microsoft.VisualStudio.TestTools.UnitTesting;
  2. using MQTTnet.Adapter;
  3. using MQTTnet.Client;
  4. using MQTTnet.Client.Connecting;
  5. using MQTTnet.Client.Disconnecting;
  6. using MQTTnet.Client.Options;
  7. using MQTTnet.Client.Receiving;
  8. using MQTTnet.Client.Subscribing;
  9. using MQTTnet.Implementations;
  10. using MQTTnet.Protocol;
  11. using MQTTnet.Server;
  12. using MQTTnet.Tests.Mockups;
  13. using System;
  14. using System.Collections.Generic;
  15. using System.Linq;
  16. using System.Net.Sockets;
  17. using System.Text;
  18. using System.Threading;
  19. using System.Threading.Tasks;
  20. namespace MQTTnet.Tests
  21. {
  22. [TestClass]
  23. public class Server_Tests
  24. {
  25. public TestContext TestContext { get; set; }
  26. [TestMethod]
  27. public async Task Use_Empty_Client_ID()
  28. {
  29. using (var testEnvironment = new TestEnvironment(TestContext))
  30. {
  31. await testEnvironment.StartServerAsync();
  32. var client = testEnvironment.CreateClient();
  33. var clientOptions = new MqttClientOptionsBuilder()
  34. .WithTcpServer("localhost", testEnvironment.ServerPort)
  35. .WithClientId(string.Empty)
  36. .Build();
  37. var connectResult = await client.ConnectAsync(clientOptions);
  38. Assert.IsFalse(connectResult.IsSessionPresent);
  39. Assert.IsTrue(client.IsConnected);
  40. }
  41. }
  42. [TestMethod]
  43. public async Task Publish_At_Most_Once_0x00()
  44. {
  45. await TestPublishAsync(
  46. "A/B/C",
  47. MqttQualityOfServiceLevel.AtMostOnce,
  48. "A/B/C",
  49. MqttQualityOfServiceLevel.AtMostOnce,
  50. 1,
  51. TestContext);
  52. }
  53. [TestMethod]
  54. public async Task Publish_At_Least_Once_0x01()
  55. {
  56. await TestPublishAsync(
  57. "A/B/C",
  58. MqttQualityOfServiceLevel.AtLeastOnce,
  59. "A/B/C",
  60. MqttQualityOfServiceLevel.AtLeastOnce,
  61. 1,
  62. TestContext);
  63. }
  64. [TestMethod]
  65. public async Task Publish_Exactly_Once_0x02()
  66. {
  67. await TestPublishAsync(
  68. "A/B/C",
  69. MqttQualityOfServiceLevel.ExactlyOnce,
  70. "A/B/C",
  71. MqttQualityOfServiceLevel.ExactlyOnce,
  72. 1,
  73. TestContext);
  74. }
  75. [TestMethod]
  76. public async Task Use_Clean_Session()
  77. {
  78. using (var testEnvironment = new TestEnvironment(TestContext))
  79. {
  80. await testEnvironment.StartServerAsync();
  81. var client = testEnvironment.CreateClient();
  82. var connectResult = await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost", testEnvironment.ServerPort).WithCleanSession().Build());
  83. Assert.IsFalse(connectResult.IsSessionPresent);
  84. }
  85. }
  86. [TestMethod]
  87. public async Task Will_Message_Do_Not_Send()
  88. {
  89. using (var testEnvironment = new TestEnvironment(TestContext))
  90. {
  91. var receivedMessagesCount = 0;
  92. await testEnvironment.StartServerAsync();
  93. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  94. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  95. var c1 = await testEnvironment.ConnectClientAsync();
  96. c1.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(c => Interlocked.Increment(ref receivedMessagesCount));
  97. await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build());
  98. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  99. await c2.DisconnectAsync().ConfigureAwait(false);
  100. await Task.Delay(1000);
  101. Assert.AreEqual(0, receivedMessagesCount);
  102. }
  103. }
  104. [TestMethod]
  105. public async Task Will_Message_Send()
  106. {
  107. using (var testEnvironment = new TestEnvironment(TestContext))
  108. {
  109. var receivedMessagesCount = 0;
  110. await testEnvironment.StartServerAsync();
  111. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  112. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  113. var c1 = await testEnvironment.ConnectClientAsync();
  114. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  115. await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build());
  116. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  117. c2.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent.
  118. await Task.Delay(1000);
  119. Assert.AreEqual(1, receivedMessagesCount);
  120. }
  121. }
  122. [TestMethod]
  123. public async Task Intercept_Subscription()
  124. {
  125. using (var testEnvironment = new TestEnvironment(TestContext))
  126. {
  127. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(
  128. c =>
  129. {
  130. // Set the topic to "a" regards what the client wants to subscribe.
  131. c.TopicFilter.Topic = "a";
  132. }));
  133. var topicAReceived = false;
  134. var topicBReceived = false;
  135. var client = await testEnvironment.ConnectClientAsync();
  136. client.UseApplicationMessageReceivedHandler(c =>
  137. {
  138. if (c.ApplicationMessage.Topic == "a")
  139. {
  140. topicAReceived = true;
  141. }
  142. else if (c.ApplicationMessage.Topic == "b")
  143. {
  144. topicBReceived = true;
  145. }
  146. });
  147. await client.SubscribeAsync("b");
  148. await client.PublishAsync("a");
  149. await Task.Delay(500);
  150. Assert.IsTrue(topicAReceived);
  151. Assert.IsFalse(topicBReceived);
  152. }
  153. }
  154. [TestMethod]
  155. public async Task Subscribe_Unsubscribe()
  156. {
  157. using (var testEnvironment = new TestEnvironment(TestContext))
  158. {
  159. var receivedMessagesCount = 0;
  160. var server = await testEnvironment.StartServerAsync();
  161. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c1"));
  162. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  163. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c2"));
  164. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  165. await c2.PublishAsync(message);
  166. await Task.Delay(500);
  167. Assert.AreEqual(0, receivedMessagesCount);
  168. var subscribeEventCalled = false;
  169. server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(e =>
  170. {
  171. subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == c1.Options.ClientId;
  172. });
  173. await c1.SubscribeAsync(new MqttTopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  174. await Task.Delay(250);
  175. Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");
  176. await c2.PublishAsync(message);
  177. await Task.Delay(250);
  178. Assert.AreEqual(1, receivedMessagesCount);
  179. var unsubscribeEventCalled = false;
  180. server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e =>
  181. {
  182. unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == c1.Options.ClientId;
  183. });
  184. await c1.UnsubscribeAsync("a");
  185. await Task.Delay(250);
  186. Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");
  187. await c2.PublishAsync(message);
  188. await Task.Delay(500);
  189. Assert.AreEqual(1, receivedMessagesCount);
  190. await Task.Delay(500);
  191. Assert.AreEqual(1, receivedMessagesCount);
  192. }
  193. }
  194. [TestMethod]
  195. public async Task Subscribe_Multiple_In_Single_Request()
  196. {
  197. using (var testEnvironment = new TestEnvironment(TestContext))
  198. {
  199. var receivedMessagesCount = 0;
  200. await testEnvironment.StartServerAsync();
  201. var c1 = await testEnvironment.ConnectClientAsync();
  202. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  203. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  204. .WithTopicFilter("a")
  205. .WithTopicFilter("b")
  206. .WithTopicFilter("c")
  207. .Build());
  208. var c2 = await testEnvironment.ConnectClientAsync();
  209. await c2.PublishAsync("a");
  210. await Task.Delay(100);
  211. Assert.AreEqual(receivedMessagesCount, 1);
  212. await c2.PublishAsync("b");
  213. await Task.Delay(100);
  214. Assert.AreEqual(receivedMessagesCount, 2);
  215. await c2.PublishAsync("c");
  216. await Task.Delay(100);
  217. Assert.AreEqual(receivedMessagesCount, 3);
  218. }
  219. }
  220. [TestMethod]
  221. public async Task Subscribe_Lots_In_Single_Request()
  222. {
  223. using (var testEnvironment = new TestEnvironment(TestContext))
  224. {
  225. var receivedMessagesCount = 0;
  226. await testEnvironment.StartServerAsync();
  227. var c1 = await testEnvironment.ConnectClientAsync();
  228. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  229. var optionsBuilder = new MqttClientSubscribeOptionsBuilder();
  230. for (var i = 0; i < 1000; i++)
  231. {
  232. optionsBuilder.WithTopicFilter(i.ToString(), MqttQualityOfServiceLevel.AtMostOnce);
  233. }
  234. await c1.SubscribeAsync(optionsBuilder.Build()).ConfigureAwait(false);
  235. var c2 = await testEnvironment.ConnectClientAsync();
  236. var messageBuilder = new MqttApplicationMessageBuilder();
  237. for (var i = 0; i < 1000; i++)
  238. {
  239. messageBuilder.WithTopic(i.ToString());
  240. await c2.PublishAsync(messageBuilder.Build()).ConfigureAwait(false);
  241. }
  242. SpinWait.SpinUntil(() => receivedMessagesCount == 1000, 2000);
  243. Assert.AreEqual(1000, receivedMessagesCount);
  244. }
  245. }
  246. [TestMethod]
  247. public async Task Subscribe_Lots_In_Multiple_Requests()
  248. {
  249. using (var testEnvironment = new TestEnvironment(TestContext))
  250. {
  251. var receivedMessagesCount = 0;
  252. await testEnvironment.StartServerAsync();
  253. var c1 = await testEnvironment.ConnectClientAsync();
  254. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  255. for (var i = 0; i < 1000; i++)
  256. {
  257. var so = new MqttClientSubscribeOptionsBuilder()
  258. .WithTopicFilter(i.ToString(), MqttQualityOfServiceLevel.AtMostOnce).Build();
  259. await c1.SubscribeAsync(so).ConfigureAwait(false);
  260. }
  261. var c2 = await testEnvironment.ConnectClientAsync();
  262. var messageBuilder = new MqttApplicationMessageBuilder();
  263. for (var i = 0; i < 1000; i++)
  264. {
  265. messageBuilder.WithTopic(i.ToString());
  266. await c2.PublishAsync(messageBuilder.Build()).ConfigureAwait(false);
  267. }
  268. SpinWait.SpinUntil(() => receivedMessagesCount == 1000, 2000);
  269. Assert.AreEqual(1000, receivedMessagesCount);
  270. }
  271. }
  272. [TestMethod]
  273. public async Task Subscribe_Multiple_In_Multiple_Request()
  274. {
  275. using (var testEnvironment = new TestEnvironment(TestContext))
  276. {
  277. var receivedMessagesCount = 0;
  278. await testEnvironment.StartServerAsync();
  279. var c1 = await testEnvironment.ConnectClientAsync();
  280. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  281. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  282. .WithTopicFilter("a")
  283. .Build());
  284. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  285. .WithTopicFilter("b")
  286. .Build());
  287. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  288. .WithTopicFilter("c")
  289. .Build());
  290. var c2 = await testEnvironment.ConnectClientAsync();
  291. await c2.PublishAsync("a");
  292. await Task.Delay(100);
  293. Assert.AreEqual(receivedMessagesCount, 1);
  294. await c2.PublishAsync("b");
  295. await Task.Delay(100);
  296. Assert.AreEqual(receivedMessagesCount, 2);
  297. await c2.PublishAsync("c");
  298. await Task.Delay(100);
  299. Assert.AreEqual(receivedMessagesCount, 3);
  300. }
  301. }
  302. [TestMethod]
  303. public async Task Publish_From_Server()
  304. {
  305. using (var testEnvironment = new TestEnvironment(TestContext))
  306. {
  307. var server = await testEnvironment.StartServerAsync();
  308. var receivedMessagesCount = 0;
  309. var client = await testEnvironment.ConnectClientAsync();
  310. client.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  311. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  312. await client.SubscribeAsync(new MqttTopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  313. await server.PublishAsync(message);
  314. await Task.Delay(1000);
  315. Assert.AreEqual(1, receivedMessagesCount);
  316. }
  317. }
  318. [TestMethod]
  319. public async Task Publish_Multiple_Clients()
  320. {
  321. var receivedMessagesCount = 0;
  322. var locked = new object();
  323. using (var testEnvironment = new TestEnvironment(TestContext))
  324. {
  325. await testEnvironment.StartServerAsync();
  326. var c1 = await testEnvironment.ConnectClientAsync();
  327. var c2 = await testEnvironment.ConnectClientAsync();
  328. c1.UseApplicationMessageReceivedHandler(c =>
  329. {
  330. lock (locked)
  331. {
  332. receivedMessagesCount++;
  333. }
  334. });
  335. c2.UseApplicationMessageReceivedHandler(c =>
  336. {
  337. lock (locked)
  338. {
  339. receivedMessagesCount++;
  340. }
  341. });
  342. await c1.SubscribeAsync(new MqttTopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }).ConfigureAwait(false);
  343. await c2.SubscribeAsync(new MqttTopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }).ConfigureAwait(false);
  344. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  345. for (var i = 0; i < 500; i++)
  346. {
  347. await c1.PublishAsync(message).ConfigureAwait(false);
  348. }
  349. await Task.Delay(5000);
  350. Assert.AreEqual(1000, receivedMessagesCount);
  351. }
  352. }
  353. [TestMethod]
  354. public async Task Session_Takeover()
  355. {
  356. using (var testEnvironment = new TestEnvironment(TestContext))
  357. {
  358. await testEnvironment.StartServerAsync();
  359. var options = new MqttClientOptionsBuilder()
  360. .WithCleanSession(false)
  361. .WithClientId("a");
  362. var client1 = await testEnvironment.ConnectClientAsync(options);
  363. await Task.Delay(500);
  364. var client2 = await testEnvironment.ConnectClientAsync(options);
  365. await Task.Delay(500);
  366. Assert.IsFalse(client1.IsConnected);
  367. Assert.IsTrue(client2.IsConnected);
  368. }
  369. }
  370. [TestMethod]
  371. public async Task No_Messages_If_No_Subscription()
  372. {
  373. using (var testEnvironment = new TestEnvironment(TestContext))
  374. {
  375. await testEnvironment.StartServerAsync();
  376. var client = await testEnvironment.ConnectClientAsync();
  377. var receivedMessages = new List<MqttApplicationMessage>();
  378. client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e =>
  379. {
  380. await client.PublishAsync("Connected");
  381. });
  382. client.UseApplicationMessageReceivedHandler(c =>
  383. {
  384. lock (receivedMessages)
  385. {
  386. receivedMessages.Add(c.ApplicationMessage);
  387. }
  388. });
  389. await Task.Delay(500);
  390. await client.PublishAsync("Hello");
  391. await Task.Delay(500);
  392. Assert.AreEqual(0, receivedMessages.Count);
  393. }
  394. }
  395. [TestMethod]
  396. public async Task Set_Subscription_At_Server()
  397. {
  398. using (var testEnvironment = new TestEnvironment(TestContext))
  399. {
  400. var server = await testEnvironment.StartServerAsync();
  401. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1"));
  402. var client = await testEnvironment.ConnectClientAsync();
  403. var receivedMessages = new List<MqttApplicationMessage>();
  404. client.UseApplicationMessageReceivedHandler(c =>
  405. {
  406. lock (receivedMessages)
  407. {
  408. receivedMessages.Add(c.ApplicationMessage);
  409. }
  410. });
  411. await Task.Delay(500);
  412. await client.PublishAsync("Hello");
  413. await Task.Delay(100);
  414. Assert.AreEqual(0, receivedMessages.Count);
  415. await client.PublishAsync("topic1");
  416. await Task.Delay(100);
  417. Assert.AreEqual(1, receivedMessages.Count);
  418. }
  419. }
  420. [TestMethod]
  421. public async Task Shutdown_Disconnects_Clients_Gracefully()
  422. {
  423. using (var testEnvironment = new TestEnvironment(TestContext))
  424. {
  425. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  426. var disconnectCalled = 0;
  427. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  428. c1.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e => disconnectCalled++);
  429. await Task.Delay(100);
  430. await server.StopAsync();
  431. await Task.Delay(100);
  432. Assert.AreEqual(1, disconnectCalled);
  433. }
  434. }
  435. [TestMethod]
  436. public async Task Handle_Clean_Disconnect()
  437. {
  438. using (var testEnvironment = new TestEnvironment(TestContext))
  439. {
  440. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  441. var clientConnectedCalled = 0;
  442. var clientDisconnectedCalled = 0;
  443. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => Interlocked.Increment(ref clientConnectedCalled));
  444. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ => Interlocked.Increment(ref clientDisconnectedCalled));
  445. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  446. Assert.AreEqual(1, clientConnectedCalled);
  447. Assert.AreEqual(0, clientDisconnectedCalled);
  448. await Task.Delay(500);
  449. await c1.DisconnectAsync();
  450. await Task.Delay(500);
  451. Assert.AreEqual(1, clientConnectedCalled);
  452. Assert.AreEqual(1, clientDisconnectedCalled);
  453. }
  454. }
  455. [TestMethod]
  456. public async Task Client_Disconnect_Without_Errors()
  457. {
  458. using (var testEnvironment = new TestEnvironment(TestContext))
  459. {
  460. bool clientWasConnected;
  461. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  462. try
  463. {
  464. var client = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  465. clientWasConnected = true;
  466. await client.DisconnectAsync();
  467. await Task.Delay(500);
  468. }
  469. finally
  470. {
  471. await server.StopAsync();
  472. }
  473. Assert.IsTrue(clientWasConnected);
  474. testEnvironment.ThrowIfLogErrors();
  475. }
  476. }
  477. [TestMethod]
  478. public async Task Handle_Lots_Of_Parallel_Retained_Messages()
  479. {
  480. const int ClientCount = 50;
  481. using (var testEnvironment = new TestEnvironment(TestContext))
  482. {
  483. var server = await testEnvironment.StartServerAsync();
  484. var tasks = new List<Task>();
  485. for (var i = 0; i < ClientCount; i++)
  486. {
  487. var i2 = i;
  488. var testEnvironment2 = testEnvironment;
  489. tasks.Add(Task.Run(async () =>
  490. {
  491. try
  492. {
  493. using (var client = await testEnvironment2.ConnectClientAsync())
  494. {
  495. // Clear retained message.
  496. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  497. .WithPayload(new byte[0]).WithRetainFlag().Build());
  498. // Set retained message.
  499. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  500. .WithPayload("value").WithRetainFlag().Build());
  501. await client.DisconnectAsync();
  502. }
  503. }
  504. catch (Exception exception)
  505. {
  506. testEnvironment2.TrackException(exception);
  507. }
  508. }));
  509. }
  510. await Task.WhenAll(tasks);
  511. await Task.Delay(1000);
  512. var retainedMessages = await server.GetRetainedApplicationMessagesAsync();
  513. Assert.AreEqual(ClientCount, retainedMessages.Count);
  514. for (var i = 0; i < ClientCount; i++)
  515. {
  516. Assert.IsTrue(retainedMessages.Any(m => m.Topic == "r" + i));
  517. }
  518. }
  519. }
  520. [TestMethod]
  521. public async Task Retained_Messages_Flow()
  522. {
  523. using (var testEnvironment = new TestEnvironment(TestContext))
  524. {
  525. var retainedMessage = new MqttApplicationMessageBuilder().WithTopic("r").WithPayload("r").WithRetainFlag().Build();
  526. await testEnvironment.StartServerAsync();
  527. var c1 = await testEnvironment.ConnectClientAsync();
  528. var receivedMessages = 0;
  529. var c2 = await testEnvironment.ConnectClientAsync();
  530. c2.UseApplicationMessageReceivedHandler(c =>
  531. {
  532. Interlocked.Increment(ref receivedMessages);
  533. });
  534. await c1.PublishAsync(retainedMessage);
  535. await c1.DisconnectAsync();
  536. await Task.Delay(500);
  537. for (var i = 0; i < 5; i++)
  538. {
  539. await c2.UnsubscribeAsync("r");
  540. await Task.Delay(100);
  541. Assert.AreEqual(i, receivedMessages);
  542. await c2.SubscribeAsync("r");
  543. await Task.Delay(100);
  544. Assert.AreEqual(i + 1, receivedMessages);
  545. }
  546. await c2.DisconnectAsync();
  547. }
  548. }
  549. [TestMethod]
  550. public async Task Receive_No_Retained_Message_After_Subscribe()
  551. {
  552. using (var testEnvironment = new TestEnvironment(TestContext))
  553. {
  554. await testEnvironment.StartServerAsync();
  555. var c1 = await testEnvironment.ConnectClientAsync();
  556. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  557. await c1.DisconnectAsync();
  558. var receivedMessagesCount = 0;
  559. var c2 = await testEnvironment.ConnectClientAsync();
  560. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  561. await c2.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("retained_other").Build());
  562. await Task.Delay(500);
  563. Assert.AreEqual(0, receivedMessagesCount);
  564. }
  565. }
  566. [TestMethod]
  567. public async Task Receive_Retained_Message_After_Subscribe()
  568. {
  569. using (var testEnvironment = new TestEnvironment(TestContext))
  570. {
  571. await testEnvironment.StartServerAsync();
  572. var c1 = await testEnvironment.ConnectClientAsync();
  573. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  574. await c1.DisconnectAsync();
  575. var receivedMessages = new List<MqttApplicationMessage>();
  576. var c2 = await testEnvironment.ConnectClientAsync();
  577. c2.UseApplicationMessageReceivedHandler(c =>
  578. {
  579. lock (receivedMessages)
  580. {
  581. receivedMessages.Add(c.ApplicationMessage);
  582. }
  583. });
  584. await c2.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("retained").Build());
  585. await Task.Delay(500);
  586. Assert.AreEqual(1, receivedMessages.Count);
  587. Assert.IsTrue(receivedMessages.First().Retain);
  588. }
  589. }
  590. [TestMethod]
  591. public async Task Clear_Retained_Message_With_Empty_Payload()
  592. {
  593. using (var testEnvironment = new TestEnvironment(TestContext))
  594. {
  595. var receivedMessagesCount = 0;
  596. await testEnvironment.StartServerAsync();
  597. var c1 = await testEnvironment.ConnectClientAsync();
  598. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  599. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build());
  600. await c1.DisconnectAsync();
  601. var c2 = await testEnvironment.ConnectClientAsync();
  602. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  603. await Task.Delay(200);
  604. await c2.SubscribeAsync(new MqttTopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  605. await Task.Delay(500);
  606. Assert.AreEqual(0, receivedMessagesCount);
  607. }
  608. }
  609. [TestMethod]
  610. public async Task Clear_Retained_Message_With_Null_Payload()
  611. {
  612. using (var testEnvironment = new TestEnvironment(TestContext))
  613. {
  614. var receivedMessagesCount = 0;
  615. await testEnvironment.StartServerAsync();
  616. var c1 = await testEnvironment.ConnectClientAsync();
  617. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  618. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload((byte[])null).WithRetainFlag().Build());
  619. await c1.DisconnectAsync();
  620. var c2 = await testEnvironment.ConnectClientAsync();
  621. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  622. await Task.Delay(200);
  623. await c2.SubscribeAsync(new MqttTopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  624. await Task.Delay(500);
  625. Assert.AreEqual(0, receivedMessagesCount);
  626. }
  627. }
  628. [TestMethod]
  629. public async Task Intercept_Application_Message()
  630. {
  631. using (var testEnvironment = new TestEnvironment(TestContext))
  632. {
  633. await testEnvironment.StartServerAsync(
  634. new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(
  635. c => { c.ApplicationMessage = new MqttApplicationMessage { Topic = "new_topic" }; }));
  636. string receivedTopic = null;
  637. var c1 = await testEnvironment.ConnectClientAsync();
  638. await c1.SubscribeAsync("#");
  639. c1.UseApplicationMessageReceivedHandler(a => { receivedTopic = a.ApplicationMessage.Topic; });
  640. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("original_topic").Build());
  641. await Task.Delay(500);
  642. Assert.AreEqual("new_topic", receivedTopic);
  643. }
  644. }
  645. [TestMethod]
  646. public async Task Persist_Retained_Message()
  647. {
  648. var serverStorage = new TestServerStorage();
  649. using (var testEnvironment = new TestEnvironment(TestContext))
  650. {
  651. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithStorage(serverStorage));
  652. var c1 = await testEnvironment.ConnectClientAsync();
  653. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  654. await Task.Delay(500);
  655. Assert.AreEqual(1, serverStorage.Messages.Count);
  656. }
  657. }
  658. [TestMethod]
  659. public async Task Publish_After_Client_Connects()
  660. {
  661. using (var testEnvironment = new TestEnvironment(TestContext))
  662. {
  663. var server = await testEnvironment.StartServerAsync();
  664. server.UseClientConnectedHandler(async e =>
  665. {
  666. await server.PublishAsync("/test/1", "true", MqttQualityOfServiceLevel.ExactlyOnce, false);
  667. });
  668. string receivedTopic = null;
  669. var c1 = await testEnvironment.ConnectClientAsync();
  670. c1.UseApplicationMessageReceivedHandler(e => { receivedTopic = e.ApplicationMessage.Topic; });
  671. await c1.SubscribeAsync("#");
  672. await testEnvironment.ConnectClientAsync();
  673. await testEnvironment.ConnectClientAsync();
  674. await testEnvironment.ConnectClientAsync();
  675. await testEnvironment.ConnectClientAsync();
  676. await Task.Delay(500);
  677. Assert.AreEqual("/test/1", receivedTopic);
  678. }
  679. }
  680. [TestMethod]
  681. public async Task Intercept_Message()
  682. {
  683. void Interceptor(MqttApplicationMessageInterceptorContext context)
  684. {
  685. context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
  686. }
  687. using (var testEnvironment = new TestEnvironment(TestContext))
  688. {
  689. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(Interceptor));
  690. var c1 = await testEnvironment.ConnectClientAsync();
  691. var c2 = await testEnvironment.ConnectClientAsync();
  692. await c2.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("test").Build());
  693. var isIntercepted = false;
  694. c2.UseApplicationMessageReceivedHandler(c =>
  695. {
  696. isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(c.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
  697. });
  698. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("test").Build());
  699. await c1.DisconnectAsync();
  700. await Task.Delay(500);
  701. Assert.IsTrue(isIntercepted);
  702. }
  703. }
  704. [TestMethod]
  705. public async Task Send_Long_Body()
  706. {
  707. using (var testEnvironment = new TestEnvironment(TestContext))
  708. {
  709. const int PayloadSizeInMB = 30;
  710. const int CharCount = PayloadSizeInMB * 1024 * 1024;
  711. var longBody = new byte[CharCount];
  712. byte @char = 32;
  713. for (long i = 0; i < PayloadSizeInMB * 1024L * 1024L; i++)
  714. {
  715. longBody[i] = @char;
  716. @char++;
  717. if (@char > 126)
  718. {
  719. @char = 32;
  720. }
  721. }
  722. byte[] receivedBody = null;
  723. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  724. var client1 = await testEnvironment.ConnectClientAsync();
  725. client1.UseApplicationMessageReceivedHandler(c =>
  726. {
  727. receivedBody = c.ApplicationMessage.Payload;
  728. });
  729. await client1.SubscribeAsync("string");
  730. var client2 = await testEnvironment.ConnectClientAsync();
  731. await client2.PublishAsync("string", longBody);
  732. await Task.Delay(500);
  733. Assert.IsTrue(longBody.SequenceEqual(receivedBody ?? new byte[0]));
  734. }
  735. }
  736. [TestMethod]
  737. public async Task Deny_Connection()
  738. {
  739. var serverOptions = new MqttServerOptionsBuilder().WithConnectionValidator(context =>
  740. {
  741. context.ReasonCode = MqttConnectReasonCode.NotAuthorized;
  742. });
  743. using (var testEnvironment = new TestEnvironment(TestContext))
  744. {
  745. testEnvironment.IgnoreClientLogErrors = true;
  746. await testEnvironment.StartServerAsync(serverOptions);
  747. var connectingFailedException = await Assert.ThrowsExceptionAsync<MqttConnectingFailedException>(() => testEnvironment.ConnectClientAsync());
  748. Assert.AreEqual(MqttClientConnectResultCode.NotAuthorized, connectingFailedException.ResultCode);
  749. }
  750. }
  751. Dictionary<string, bool> _connected;
  752. private void ConnectionValidationHandler(MqttConnectionValidatorContext eventArgs)
  753. {
  754. if (_connected.ContainsKey(eventArgs.ClientId))
  755. {
  756. eventArgs.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  757. return;
  758. }
  759. _connected[eventArgs.ClientId] = true;
  760. eventArgs.ReasonCode = MqttConnectReasonCode.Success;
  761. return;
  762. }
  763. [TestMethod]
  764. public async Task Same_Client_Id_Refuse_Connection()
  765. {
  766. using (var testEnvironment = new TestEnvironment(TestContext))
  767. {
  768. testEnvironment.IgnoreClientLogErrors = true;
  769. _connected = new Dictionary<string, bool>();
  770. var options = new MqttServerOptionsBuilder();
  771. options.WithConnectionValidator(e => ConnectionValidationHandler(e));
  772. var server = await testEnvironment.StartServerAsync(options);
  773. var events = new List<string>();
  774. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ =>
  775. {
  776. lock (events)
  777. {
  778. events.Add("c");
  779. }
  780. });
  781. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ =>
  782. {
  783. lock (events)
  784. {
  785. events.Add("d");
  786. }
  787. });
  788. var clientOptions = new MqttClientOptionsBuilder()
  789. .WithClientId("same_id");
  790. // c
  791. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  792. c1.UseDisconnectedHandler(_ =>
  793. {
  794. lock (events)
  795. {
  796. events.Add("x");
  797. }
  798. });
  799. c1.UseApplicationMessageReceivedHandler(_ =>
  800. {
  801. lock (events)
  802. {
  803. events.Add("r");
  804. }
  805. });
  806. c1.SubscribeAsync("topic").Wait();
  807. await Task.Delay(500);
  808. c1.PublishAsync("topic").Wait();
  809. await Task.Delay(500);
  810. var flow = string.Join(string.Empty, events);
  811. Assert.AreEqual("cr", flow);
  812. try
  813. {
  814. await testEnvironment.ConnectClientAsync(clientOptions);
  815. Assert.Fail("same id connection is expected to fail");
  816. }
  817. catch
  818. {
  819. //same id connection is expected to fail
  820. }
  821. await Task.Delay(500);
  822. flow = string.Join(string.Empty, events);
  823. Assert.AreEqual("cr", flow);
  824. c1.PublishAsync("topic").Wait();
  825. await Task.Delay(500);
  826. flow = string.Join(string.Empty, events);
  827. Assert.AreEqual("crr", flow);
  828. }
  829. }
  830. [TestMethod]
  831. public async Task Same_Client_Id_Connect_Disconnect_Event_Order()
  832. {
  833. using (var testEnvironment = new TestEnvironment())
  834. {
  835. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  836. var events = new List<string>();
  837. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ =>
  838. {
  839. lock (events)
  840. {
  841. events.Add("c");
  842. }
  843. });
  844. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ =>
  845. {
  846. lock (events)
  847. {
  848. events.Add("d");
  849. }
  850. });
  851. var clientOptionsBuilder = new MqttClientOptionsBuilder()
  852. .WithClientId(Guid.NewGuid().ToString());
  853. // c
  854. var c1 = await testEnvironment.ConnectClientAsync(clientOptionsBuilder);
  855. await Task.Delay(500);
  856. var flow = string.Join(string.Empty, events);
  857. Assert.AreEqual("c", flow);
  858. // dc
  859. // Connect client with same client ID. Should disconnect existing client.
  860. var c2 = await testEnvironment.ConnectClientAsync(clientOptionsBuilder);
  861. await Task.Delay(500);
  862. flow = string.Join(string.Empty, events);
  863. Assert.AreEqual("cdc", flow);
  864. c2.UseApplicationMessageReceivedHandler(_ =>
  865. {
  866. lock (events)
  867. {
  868. events.Add("r");
  869. }
  870. });
  871. await c2.SubscribeAsync("topic");
  872. // r
  873. await c2.PublishAsync("topic");
  874. await Task.Delay(500);
  875. flow = string.Join(string.Empty, events);
  876. Assert.AreEqual("cdcr", flow);
  877. // nothing
  878. Assert.AreEqual(false, c1.IsConnected);
  879. await c1.DisconnectAsync();
  880. Assert.AreEqual(false, c1.IsConnected);
  881. await Task.Delay(500);
  882. // d
  883. Assert.AreEqual(true, c2.IsConnected);
  884. await c2.DisconnectAsync();
  885. await Task.Delay(500);
  886. await server.StopAsync();
  887. flow = string.Join(string.Empty, events);
  888. Assert.AreEqual("cdcrd", flow);
  889. }
  890. }
  891. [TestMethod]
  892. public async Task Remove_Session()
  893. {
  894. using (var testEnvironment = new TestEnvironment(TestContext))
  895. {
  896. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  897. var clientOptions = new MqttClientOptionsBuilder();
  898. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  899. await Task.Delay(500);
  900. Assert.AreEqual(1, (await server.GetClientStatusAsync()).Count);
  901. await c1.DisconnectAsync();
  902. await Task.Delay(500);
  903. Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count);
  904. }
  905. }
  906. [TestMethod]
  907. public async Task Stop_And_Restart()
  908. {
  909. using (var testEnvironment = new TestEnvironment(TestContext))
  910. {
  911. testEnvironment.IgnoreClientLogErrors = true;
  912. var server = await testEnvironment.StartServerAsync();
  913. await testEnvironment.ConnectClientAsync();
  914. await server.StopAsync();
  915. try
  916. {
  917. await testEnvironment.ConnectClientAsync();
  918. Assert.Fail("Connecting should fail.");
  919. }
  920. catch (Exception)
  921. {
  922. }
  923. await server.StartAsync(new MqttServerOptionsBuilder().WithDefaultEndpointPort(testEnvironment.ServerPort).Build());
  924. await testEnvironment.ConnectClientAsync();
  925. }
  926. }
  927. [TestMethod]
  928. public async Task Close_Idle_Connection()
  929. {
  930. using (var testEnvironment = new TestEnvironment(TestContext))
  931. {
  932. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));
  933. var client = new CrossPlatformSocket(AddressFamily.InterNetwork);
  934. await client.ConnectAsync("localhost", testEnvironment.ServerPort, CancellationToken.None);
  935. // Don't send anything. The server should close the connection.
  936. await Task.Delay(TimeSpan.FromSeconds(3));
  937. try
  938. {
  939. var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
  940. if (receivedBytes == 0)
  941. {
  942. return;
  943. }
  944. Assert.Fail("Receive should throw an exception.");
  945. }
  946. catch (SocketException)
  947. {
  948. }
  949. }
  950. }
  951. [TestMethod]
  952. public async Task Send_Garbage()
  953. {
  954. using (var testEnvironment = new TestEnvironment(TestContext))
  955. {
  956. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));
  957. // Send an invalid packet and ensure that the server will close the connection and stay in a waiting state
  958. // forever. This is security related.
  959. var client = new CrossPlatformSocket(AddressFamily.InterNetwork);
  960. await client.ConnectAsync("localhost", testEnvironment.ServerPort, CancellationToken.None);
  961. var buffer = Encoding.UTF8.GetBytes("Garbage");
  962. await client.SendAsync(new ArraySegment<byte>(buffer), SocketFlags.None);
  963. await Task.Delay(TimeSpan.FromSeconds(3));
  964. try
  965. {
  966. var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
  967. if (receivedBytes == 0)
  968. {
  969. return;
  970. }
  971. Assert.Fail("Receive should throw an exception.");
  972. }
  973. catch (SocketException)
  974. {
  975. }
  976. }
  977. }
  978. [TestMethod]
  979. public async Task Do_Not_Send_Retained_Messages_For_Denied_Subscription()
  980. {
  981. using (var testEnvironment = new TestEnvironment(TestContext))
  982. {
  983. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(c =>
  984. {
  985. // This should lead to no subscriptions for "n" at all. So also no sending of retained messages.
  986. if (c.TopicFilter.Topic == "n")
  987. {
  988. c.AcceptSubscription = false;
  989. }
  990. }));
  991. // Prepare some retained messages.
  992. var client1 = await testEnvironment.ConnectClientAsync();
  993. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("y").WithPayload("x").WithRetainFlag().Build());
  994. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("n").WithPayload("x").WithRetainFlag().Build());
  995. await client1.DisconnectAsync();
  996. await Task.Delay(500);
  997. // Subscribe to all retained message types.
  998. // It is important to do this in a range of filters to ensure that a subscription is not "hidden".
  999. var client2 = await testEnvironment.ConnectClientAsync();
  1000. var buffer = new StringBuilder();
  1001. client2.UseApplicationMessageReceivedHandler(c =>
  1002. {
  1003. lock (buffer)
  1004. {
  1005. buffer.Append(c.ApplicationMessage.Topic);
  1006. }
  1007. });
  1008. await client2.SubscribeAsync(new MqttTopicFilter { Topic = "y" }, new MqttTopicFilter { Topic = "n" });
  1009. await Task.Delay(500);
  1010. Assert.AreEqual("y", buffer.ToString());
  1011. }
  1012. }
  1013. [TestMethod]
  1014. public async Task Collect_Messages_In_Disconnected_Session()
  1015. {
  1016. using (var testEnvironment = new TestEnvironment(TestContext))
  1017. {
  1018. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions());
  1019. // Create the session including the subscription.
  1020. var client1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a"));
  1021. await client1.SubscribeAsync("x");
  1022. await client1.DisconnectAsync();
  1023. await Task.Delay(500);
  1024. var clientStatus = await server.GetClientStatusAsync();
  1025. Assert.AreEqual(0, clientStatus.Count);
  1026. var client2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("b"));
  1027. await client2.PublishAsync("x", "1");
  1028. await client2.PublishAsync("x", "2");
  1029. await client2.PublishAsync("x", "3");
  1030. await client2.DisconnectAsync();
  1031. await Task.Delay(500);
  1032. clientStatus = await server.GetClientStatusAsync();
  1033. var sessionStatus = await server.GetSessionStatusAsync();
  1034. Assert.AreEqual(0, clientStatus.Count);
  1035. Assert.AreEqual(2, sessionStatus.Count);
  1036. Assert.AreEqual(3, sessionStatus.First(s => s.ClientId == client1.Options.ClientId).PendingApplicationMessagesCount);
  1037. }
  1038. }
  1039. private static async Task TestPublishAsync(
  1040. string topic,
  1041. MqttQualityOfServiceLevel qualityOfServiceLevel,
  1042. string topicFilter,
  1043. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  1044. int expectedReceivedMessagesCount,
  1045. TestContext testContext)
  1046. {
  1047. using (var testEnvironment = new TestEnvironment(testContext))
  1048. {
  1049. var receivedMessagesCount = 0;
  1050. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  1051. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("receiver"));
  1052. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  1053. await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());
  1054. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("sender"));
  1055. await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build());
  1056. await c2.DisconnectAsync().ConfigureAwait(false);
  1057. await Task.Delay(500);
  1058. await c1.UnsubscribeAsync(topicFilter);
  1059. await Task.Delay(500);
  1060. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  1061. }
  1062. }
  1063. [TestMethod]
  1064. public async Task Intercept_Undelivered()
  1065. {
  1066. using (var testEnvironment = new TestEnvironment())
  1067. {
  1068. var undeliverd = string.Empty;
  1069. var svr = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithUndeliveredMessageInterceptor(
  1070. context =>
  1071. {
  1072. undeliverd = context.ApplicationMessage.Topic;
  1073. }));
  1074. var topicAReceived = false;
  1075. var topicBReceived = false;
  1076. var client = await testEnvironment.ConnectClientAsync();
  1077. await client.SubscribeAsync("b");
  1078. await client.PublishAsync("a", null, MqttQualityOfServiceLevel.ExactlyOnce);
  1079. await Task.Delay(500);
  1080. Assert.AreEqual(undeliverd, "a");
  1081. }
  1082. }
  1083. }
  1084. }