You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1477 lines
54 KiB

  1. using Microsoft.VisualStudio.TestTools.UnitTesting;
  2. using MQTTnet.Adapter;
  3. using MQTTnet.Client;
  4. using MQTTnet.Client.Connecting;
  5. using MQTTnet.Client.Disconnecting;
  6. using MQTTnet.Client.Options;
  7. using MQTTnet.Client.Receiving;
  8. using MQTTnet.Client.Subscribing;
  9. using MQTTnet.Exceptions;
  10. using MQTTnet.Protocol;
  11. using MQTTnet.Server;
  12. using MQTTnet.Tests.Mockups;
  13. using System;
  14. using System.Collections.Generic;
  15. using System.Linq;
  16. using System.Text;
  17. using System.Threading;
  18. using System.Threading.Tasks;
  19. using MQTTnet.Diagnostics;
  20. using MQTTnet.Formatter;
  21. namespace MQTTnet.Tests
  22. {
  23. [TestClass]
  24. public sealed class Server_Tests
  25. {
  26. public TestContext TestContext { get; set; }
  27. [TestMethod]
  28. [DataRow("", null)]
  29. [DataRow("", "")]
  30. [DataRow(null, null)]
  31. public async Task Use_Admissible_Credentials(string username, string password)
  32. {
  33. using (var testEnvironment = new TestEnvironment())
  34. {
  35. await testEnvironment.StartServerAsync();
  36. var client = testEnvironment.CreateClient();
  37. var clientOptions = new MqttClientOptionsBuilder()
  38. .WithTcpServer("localhost", testEnvironment.ServerPort)
  39. .WithCredentials(username, password)
  40. .Build();
  41. var connectResult = await client.ConnectAsync(clientOptions);
  42. Assert.IsFalse(connectResult.IsSessionPresent);
  43. Assert.IsTrue(client.IsConnected);
  44. }
  45. }
  46. [TestMethod]
  47. public async Task Use_Username_Null_Password_Empty()
  48. {
  49. string username = null;
  50. string password = string.Empty;
  51. using (var testEnvironment = new TestEnvironment())
  52. {
  53. testEnvironment.IgnoreClientLogErrors = true;
  54. await testEnvironment.StartServerAsync();
  55. var client = testEnvironment.CreateClient();
  56. var clientOptions = new MqttClientOptionsBuilder()
  57. .WithTcpServer("localhost", testEnvironment.ServerPort)
  58. .WithCredentials(username, password)
  59. .Build();
  60. var ex = await Assert.ThrowsExceptionAsync<MqttConnectingFailedException>(async () => await client.ConnectAsync(clientOptions));
  61. Assert.IsInstanceOfType(ex.InnerException, typeof(MqttProtocolViolationException));
  62. Assert.AreEqual("Error while authenticating. If the User Name Flag is set to 0, the Password Flag MUST be set to 0 [MQTT-3.1.2-22].", ex.Message, false);
  63. }
  64. }
  65. [TestMethod]
  66. public async Task Use_Empty_Client_ID()
  67. {
  68. using (var testEnvironment = new TestEnvironment(TestContext))
  69. {
  70. await testEnvironment.StartServerAsync();
  71. var client = testEnvironment.CreateClient();
  72. var clientOptions = new MqttClientOptionsBuilder()
  73. .WithTcpServer("localhost", testEnvironment.ServerPort)
  74. .WithClientId(string.Empty)
  75. .Build();
  76. var connectResult = await client.ConnectAsync(clientOptions);
  77. Assert.IsFalse(connectResult.IsSessionPresent);
  78. Assert.IsTrue(client.IsConnected);
  79. }
  80. }
  81. [TestMethod]
  82. public async Task Publish_At_Most_Once_0x00()
  83. {
  84. await TestPublishAsync(
  85. "A/B/C",
  86. MqttQualityOfServiceLevel.AtMostOnce,
  87. "A/B/C",
  88. MqttQualityOfServiceLevel.AtMostOnce,
  89. 1,
  90. TestContext);
  91. }
  92. [TestMethod]
  93. public async Task Publish_At_Least_Once_0x01()
  94. {
  95. await TestPublishAsync(
  96. "A/B/C",
  97. MqttQualityOfServiceLevel.AtLeastOnce,
  98. "A/B/C",
  99. MqttQualityOfServiceLevel.AtLeastOnce,
  100. 1,
  101. TestContext);
  102. }
  103. [TestMethod]
  104. public async Task Publish_Exactly_Once_0x02()
  105. {
  106. await TestPublishAsync(
  107. "A/B/C",
  108. MqttQualityOfServiceLevel.ExactlyOnce,
  109. "A/B/C",
  110. MqttQualityOfServiceLevel.ExactlyOnce,
  111. 1,
  112. TestContext);
  113. }
  114. [TestMethod]
  115. public async Task Use_Clean_Session()
  116. {
  117. using (var testEnvironment = new TestEnvironment(TestContext))
  118. {
  119. await testEnvironment.StartServerAsync();
  120. var client = testEnvironment.CreateClient();
  121. var connectResult = await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost", testEnvironment.ServerPort).WithCleanSession().Build());
  122. Assert.IsFalse(connectResult.IsSessionPresent);
  123. }
  124. }
  125. [TestMethod]
  126. public async Task Will_Message_Do_Not_Send()
  127. {
  128. using (var testEnvironment = new TestEnvironment(TestContext))
  129. {
  130. var receivedMessagesCount = 0;
  131. await testEnvironment.StartServerAsync();
  132. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  133. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  134. var c1 = await testEnvironment.ConnectClientAsync();
  135. c1.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(c => Interlocked.Increment(ref receivedMessagesCount));
  136. await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build());
  137. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  138. await c2.DisconnectAsync().ConfigureAwait(false);
  139. await Task.Delay(1000);
  140. Assert.AreEqual(0, receivedMessagesCount);
  141. }
  142. }
  143. [TestMethod]
  144. public async Task Will_Message_Send()
  145. {
  146. using (var testEnvironment = new TestEnvironment(TestContext))
  147. {
  148. var receivedMessagesCount = 0;
  149. await testEnvironment.StartServerAsync();
  150. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  151. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  152. var c1 = await testEnvironment.ConnectClientAsync();
  153. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  154. await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build());
  155. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  156. c2.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent.
  157. await Task.Delay(1000);
  158. Assert.AreEqual(1, receivedMessagesCount);
  159. }
  160. }
  161. [TestMethod]
  162. public async Task Intercept_Subscription()
  163. {
  164. using (var testEnvironment = new TestEnvironment(TestContext))
  165. {
  166. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(
  167. c =>
  168. {
  169. // Set the topic to "a" regards what the client wants to subscribe.
  170. c.TopicFilter.Topic = "a";
  171. }));
  172. var topicAReceived = false;
  173. var topicBReceived = false;
  174. var client = await testEnvironment.ConnectClientAsync();
  175. client.UseApplicationMessageReceivedHandler(c =>
  176. {
  177. if (c.ApplicationMessage.Topic == "a")
  178. {
  179. topicAReceived = true;
  180. }
  181. else if (c.ApplicationMessage.Topic == "b")
  182. {
  183. topicBReceived = true;
  184. }
  185. });
  186. await client.SubscribeAsync("b");
  187. await client.PublishAsync("a");
  188. await Task.Delay(500);
  189. Assert.IsTrue(topicAReceived);
  190. Assert.IsFalse(topicBReceived);
  191. }
  192. }
  193. [TestMethod]
  194. public async Task Subscribe_Unsubscribe()
  195. {
  196. using (var testEnvironment = new TestEnvironment(TestContext))
  197. {
  198. var receivedMessagesCount = 0;
  199. var server = await testEnvironment.StartServerAsync();
  200. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c1"));
  201. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  202. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c2"));
  203. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  204. await c2.PublishAsync(message);
  205. await Task.Delay(500);
  206. Assert.AreEqual(0, receivedMessagesCount);
  207. var subscribeEventCalled = false;
  208. server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(e =>
  209. {
  210. subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == c1.Options.ClientId;
  211. });
  212. await c1.SubscribeAsync(new MqttTopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  213. await Task.Delay(250);
  214. Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");
  215. await c2.PublishAsync(message);
  216. await Task.Delay(250);
  217. Assert.AreEqual(1, receivedMessagesCount);
  218. var unsubscribeEventCalled = false;
  219. server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e =>
  220. {
  221. unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == c1.Options.ClientId;
  222. });
  223. await c1.UnsubscribeAsync("a");
  224. await Task.Delay(250);
  225. Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");
  226. await c2.PublishAsync(message);
  227. await Task.Delay(500);
  228. Assert.AreEqual(1, receivedMessagesCount);
  229. await Task.Delay(500);
  230. Assert.AreEqual(1, receivedMessagesCount);
  231. }
  232. }
  233. [TestMethod]
  234. public async Task Subscribe_Multiple_In_Single_Request()
  235. {
  236. using (var testEnvironment = new TestEnvironment(TestContext))
  237. {
  238. var receivedMessagesCount = 0;
  239. await testEnvironment.StartServerAsync();
  240. var c1 = await testEnvironment.ConnectClientAsync();
  241. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  242. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  243. .WithTopicFilter("a")
  244. .WithTopicFilter("b")
  245. .WithTopicFilter("c")
  246. .Build());
  247. var c2 = await testEnvironment.ConnectClientAsync();
  248. await c2.PublishAsync("a");
  249. await Task.Delay(100);
  250. Assert.AreEqual(receivedMessagesCount, 1);
  251. await c2.PublishAsync("b");
  252. await Task.Delay(100);
  253. Assert.AreEqual(receivedMessagesCount, 2);
  254. await c2.PublishAsync("c");
  255. await Task.Delay(100);
  256. Assert.AreEqual(receivedMessagesCount, 3);
  257. }
  258. }
  259. [TestMethod]
  260. public async Task Subscribe_Lots_In_Single_Request()
  261. {
  262. using (var testEnvironment = new TestEnvironment(TestContext))
  263. {
  264. var receivedMessagesCount = 0;
  265. await testEnvironment.StartServerAsync();
  266. var c1 = await testEnvironment.ConnectClientAsync();
  267. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  268. var optionsBuilder = new MqttClientSubscribeOptionsBuilder();
  269. for (var i = 0; i < 500; i++)
  270. {
  271. optionsBuilder.WithTopicFilter(i.ToString(), MqttQualityOfServiceLevel.AtMostOnce);
  272. }
  273. await c1.SubscribeAsync(optionsBuilder.Build()).ConfigureAwait(false);
  274. var c2 = await testEnvironment.ConnectClientAsync();
  275. var messageBuilder = new MqttApplicationMessageBuilder();
  276. for (var i = 0; i < 500; i++)
  277. {
  278. messageBuilder.WithTopic(i.ToString());
  279. await c2.PublishAsync(messageBuilder.Build()).ConfigureAwait(false);
  280. }
  281. SpinWait.SpinUntil(() => receivedMessagesCount == 500, TimeSpan.FromSeconds(20));
  282. Assert.AreEqual(500, receivedMessagesCount);
  283. }
  284. }
  285. [TestMethod]
  286. public async Task Subscribe_Lots_In_Multiple_Requests()
  287. {
  288. using (var testEnvironment = new TestEnvironment(TestContext))
  289. {
  290. var receivedMessagesCount = 0;
  291. await testEnvironment.StartServerAsync();
  292. var c1 = await testEnvironment.ConnectClientAsync();
  293. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  294. for (var i = 0; i < 500; i++)
  295. {
  296. var so = new MqttClientSubscribeOptionsBuilder()
  297. .WithTopicFilter(i.ToString()).Build();
  298. await c1.SubscribeAsync(so).ConfigureAwait(false);
  299. await Task.Delay(10);
  300. }
  301. var c2 = await testEnvironment.ConnectClientAsync();
  302. var messageBuilder = new MqttApplicationMessageBuilder();
  303. for (var i = 0; i < 500; i++)
  304. {
  305. messageBuilder.WithTopic(i.ToString());
  306. await c2.PublishAsync(messageBuilder.Build()).ConfigureAwait(false);
  307. await Task.Delay(10);
  308. }
  309. SpinWait.SpinUntil(() => receivedMessagesCount == 500, 5000);
  310. Assert.AreEqual(500, receivedMessagesCount);
  311. }
  312. }
  313. [TestMethod]
  314. public async Task Subscribe_Multiple_In_Multiple_Request()
  315. {
  316. using (var testEnvironment = new TestEnvironment(TestContext))
  317. {
  318. var receivedMessagesCount = 0;
  319. await testEnvironment.StartServerAsync();
  320. var c1 = await testEnvironment.ConnectClientAsync();
  321. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  322. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  323. .WithTopicFilter("a")
  324. .Build());
  325. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  326. .WithTopicFilter("b")
  327. .Build());
  328. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  329. .WithTopicFilter("c")
  330. .Build());
  331. var c2 = await testEnvironment.ConnectClientAsync();
  332. await c2.PublishAsync("a");
  333. await Task.Delay(100);
  334. Assert.AreEqual(receivedMessagesCount, 1);
  335. await c2.PublishAsync("b");
  336. await Task.Delay(100);
  337. Assert.AreEqual(receivedMessagesCount, 2);
  338. await c2.PublishAsync("c");
  339. await Task.Delay(100);
  340. Assert.AreEqual(receivedMessagesCount, 3);
  341. }
  342. }
  343. [TestMethod]
  344. public async Task Publish_From_Server()
  345. {
  346. using (var testEnvironment = new TestEnvironment(TestContext))
  347. {
  348. var server = await testEnvironment.StartServerAsync();
  349. var receivedMessagesCount = 0;
  350. var client = await testEnvironment.ConnectClientAsync();
  351. client.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  352. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  353. await client.SubscribeAsync(new MqttTopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  354. await server.PublishAsync(message);
  355. await Task.Delay(1000);
  356. Assert.AreEqual(1, receivedMessagesCount);
  357. }
  358. }
  359. [TestMethod]
  360. public async Task Publish_Multiple_Clients()
  361. {
  362. var receivedMessagesCount = 0;
  363. using (var testEnvironment = new TestEnvironment(TestContext))
  364. {
  365. await testEnvironment.StartServerAsync();
  366. var c1 = await testEnvironment.ConnectClientAsync();
  367. var c2 = await testEnvironment.ConnectClientAsync();
  368. var c3 = await testEnvironment.ConnectClientAsync();
  369. c2.UseApplicationMessageReceivedHandler(c =>
  370. {
  371. Interlocked.Increment(ref receivedMessagesCount);
  372. });
  373. c3.UseApplicationMessageReceivedHandler(c =>
  374. {
  375. Interlocked.Increment(ref receivedMessagesCount);
  376. });
  377. await c2.SubscribeAsync(new MqttTopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }).ConfigureAwait(false);
  378. await c3.SubscribeAsync(new MqttTopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce }).ConfigureAwait(false);
  379. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  380. for (var i = 0; i < 500; i++)
  381. {
  382. await c1.PublishAsync(message).ConfigureAwait(false);
  383. }
  384. SpinWait.SpinUntil(() => receivedMessagesCount == 1000, TimeSpan.FromSeconds(20));
  385. Assert.AreEqual(1000, receivedMessagesCount);
  386. }
  387. }
  388. [TestMethod]
  389. public async Task Session_Takeover()
  390. {
  391. using (var testEnvironment = new TestEnvironment(TestContext))
  392. {
  393. await testEnvironment.StartServerAsync();
  394. var options = new MqttClientOptionsBuilder()
  395. .WithCleanSession(false)
  396. .WithProtocolVersion(MqttProtocolVersion.V500) // Disconnect reason is only available in MQTT 5+
  397. .WithClientId("a");
  398. var client1 = await testEnvironment.ConnectClientAsync(options);
  399. await Task.Delay(500);
  400. MqttClientDisconnectReason disconnectReason = MqttClientDisconnectReason.NormalDisconnection;
  401. client1.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(c =>
  402. {
  403. disconnectReason = c.Reason;
  404. });
  405. var client2 = await testEnvironment.ConnectClientAsync(options);
  406. await Task.Delay(500);
  407. Assert.IsFalse(client1.IsConnected);
  408. Assert.IsTrue(client2.IsConnected);
  409. Assert.AreEqual(MqttClientDisconnectReason.SessionTakenOver, disconnectReason);
  410. }
  411. }
  412. [TestMethod]
  413. public async Task No_Messages_If_No_Subscription()
  414. {
  415. using (var testEnvironment = new TestEnvironment(TestContext))
  416. {
  417. await testEnvironment.StartServerAsync();
  418. var client = await testEnvironment.ConnectClientAsync();
  419. var receivedMessages = new List<MqttApplicationMessage>();
  420. client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e =>
  421. {
  422. await client.PublishAsync("Connected");
  423. });
  424. client.UseApplicationMessageReceivedHandler(c =>
  425. {
  426. lock (receivedMessages)
  427. {
  428. receivedMessages.Add(c.ApplicationMessage);
  429. }
  430. });
  431. await Task.Delay(500);
  432. await client.PublishAsync("Hello");
  433. await Task.Delay(500);
  434. Assert.AreEqual(0, receivedMessages.Count);
  435. }
  436. }
  437. [TestMethod]
  438. public async Task Set_Subscription_At_Server()
  439. {
  440. using (var testEnvironment = new TestEnvironment(TestContext))
  441. {
  442. var server = await testEnvironment.StartServerAsync();
  443. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1"));
  444. var client = await testEnvironment.ConnectClientAsync();
  445. var receivedMessages = new List<MqttApplicationMessage>();
  446. client.UseApplicationMessageReceivedHandler(c =>
  447. {
  448. lock (receivedMessages)
  449. {
  450. receivedMessages.Add(c.ApplicationMessage);
  451. }
  452. });
  453. await Task.Delay(500);
  454. await client.PublishAsync("Hello");
  455. await Task.Delay(100);
  456. Assert.AreEqual(0, receivedMessages.Count);
  457. await client.PublishAsync("topic1");
  458. await Task.Delay(100);
  459. Assert.AreEqual(1, receivedMessages.Count);
  460. }
  461. }
  462. [TestMethod]
  463. public async Task Shutdown_Disconnects_Clients_Gracefully()
  464. {
  465. using (var testEnvironment = new TestEnvironment(TestContext))
  466. {
  467. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  468. var disconnectCalled = 0;
  469. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  470. c1.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e => disconnectCalled++);
  471. await Task.Delay(100);
  472. await server.StopAsync();
  473. await Task.Delay(100);
  474. Assert.AreEqual(1, disconnectCalled);
  475. }
  476. }
  477. [TestMethod]
  478. public async Task Handle_Clean_Disconnect()
  479. {
  480. using (var testEnvironment = new TestEnvironment(TestContext))
  481. {
  482. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  483. var clientConnectedCalled = 0;
  484. var clientDisconnectedCalled = 0;
  485. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => Interlocked.Increment(ref clientConnectedCalled));
  486. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ => Interlocked.Increment(ref clientDisconnectedCalled));
  487. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  488. Assert.AreEqual(1, clientConnectedCalled);
  489. Assert.AreEqual(0, clientDisconnectedCalled);
  490. await Task.Delay(500);
  491. await c1.DisconnectAsync();
  492. await Task.Delay(500);
  493. Assert.AreEqual(1, clientConnectedCalled);
  494. Assert.AreEqual(1, clientDisconnectedCalled);
  495. }
  496. }
  497. [TestMethod]
  498. public async Task Client_Disconnect_Without_Errors()
  499. {
  500. using (var testEnvironment = new TestEnvironment(TestContext))
  501. {
  502. bool clientWasConnected;
  503. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  504. try
  505. {
  506. var client = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  507. clientWasConnected = true;
  508. await client.DisconnectAsync();
  509. await Task.Delay(500);
  510. }
  511. finally
  512. {
  513. await server.StopAsync();
  514. }
  515. Assert.IsTrue(clientWasConnected);
  516. testEnvironment.ThrowIfLogErrors();
  517. }
  518. }
  519. [TestMethod]
  520. public async Task Handle_Lots_Of_Parallel_Retained_Messages()
  521. {
  522. const int ClientCount = 50;
  523. using (var testEnvironment = new TestEnvironment(TestContext))
  524. {
  525. var server = await testEnvironment.StartServerAsync();
  526. var tasks = new List<Task>();
  527. for (var i = 0; i < ClientCount; i++)
  528. {
  529. var i2 = i;
  530. var testEnvironment2 = testEnvironment;
  531. tasks.Add(Task.Run(async () =>
  532. {
  533. try
  534. {
  535. using (var client = await testEnvironment2.ConnectClientAsync())
  536. {
  537. // Clear retained message.
  538. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  539. .WithPayload(new byte[0]).WithRetainFlag().WithQualityOfServiceLevel(1).Build());
  540. // Set retained message.
  541. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  542. .WithPayload("value").WithRetainFlag().WithQualityOfServiceLevel(1).Build());
  543. await client.DisconnectAsync();
  544. }
  545. }
  546. catch (Exception exception)
  547. {
  548. testEnvironment2.TrackException(exception);
  549. }
  550. }));
  551. await Task.Delay(10);
  552. }
  553. await Task.WhenAll(tasks);
  554. await Task.Delay(1000);
  555. var retainedMessages = await server.GetRetainedApplicationMessagesAsync();
  556. Assert.AreEqual(ClientCount, retainedMessages.Count);
  557. for (var i = 0; i < ClientCount; i++)
  558. {
  559. Assert.IsTrue(retainedMessages.Any(m => m.Topic == "r" + i));
  560. }
  561. }
  562. }
  563. [TestMethod]
  564. public async Task Retained_Messages_Flow()
  565. {
  566. using (var testEnvironment = new TestEnvironment(TestContext))
  567. {
  568. var retainedMessage = new MqttApplicationMessageBuilder().WithTopic("r").WithPayload("r").WithRetainFlag().Build();
  569. await testEnvironment.StartServerAsync();
  570. var c1 = await testEnvironment.ConnectClientAsync();
  571. var receivedMessages = 0;
  572. var c2 = await testEnvironment.ConnectClientAsync();
  573. c2.UseApplicationMessageReceivedHandler(c =>
  574. {
  575. Interlocked.Increment(ref receivedMessages);
  576. });
  577. await c1.PublishAsync(retainedMessage);
  578. await c1.DisconnectAsync();
  579. await Task.Delay(500);
  580. for (var i = 0; i < 5; i++)
  581. {
  582. await c2.UnsubscribeAsync("r");
  583. await Task.Delay(100);
  584. Assert.AreEqual(i, receivedMessages);
  585. await c2.SubscribeAsync("r");
  586. await Task.Delay(100);
  587. Assert.AreEqual(i + 1, receivedMessages);
  588. }
  589. await c2.DisconnectAsync();
  590. }
  591. }
  592. [TestMethod]
  593. public async Task Receive_No_Retained_Message_After_Subscribe()
  594. {
  595. using (var testEnvironment = new TestEnvironment(TestContext))
  596. {
  597. await testEnvironment.StartServerAsync();
  598. var c1 = await testEnvironment.ConnectClientAsync();
  599. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  600. await c1.DisconnectAsync();
  601. var receivedMessagesCount = 0;
  602. var c2 = await testEnvironment.ConnectClientAsync();
  603. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  604. await c2.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("retained_other").Build());
  605. await Task.Delay(500);
  606. Assert.AreEqual(0, receivedMessagesCount);
  607. }
  608. }
  609. [TestMethod]
  610. public async Task Receive_Retained_Message_After_Subscribe()
  611. {
  612. using (var testEnvironment = new TestEnvironment(TestContext))
  613. {
  614. await testEnvironment.StartServerAsync();
  615. var c1 = await testEnvironment.ConnectClientAsync();
  616. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  617. await c1.DisconnectAsync();
  618. var receivedMessages = new List<MqttApplicationMessage>();
  619. var c2 = await testEnvironment.ConnectClientAsync();
  620. c2.UseApplicationMessageReceivedHandler(c =>
  621. {
  622. lock (receivedMessages)
  623. {
  624. receivedMessages.Add(c.ApplicationMessage);
  625. }
  626. });
  627. await c2.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("retained").Build());
  628. await Task.Delay(500);
  629. Assert.AreEqual(1, receivedMessages.Count);
  630. Assert.IsTrue(receivedMessages.First().Retain);
  631. }
  632. }
  633. [TestMethod]
  634. public async Task Clear_Retained_Message_With_Empty_Payload()
  635. {
  636. using (var testEnvironment = new TestEnvironment(TestContext))
  637. {
  638. var receivedMessagesCount = 0;
  639. await testEnvironment.StartServerAsync();
  640. var c1 = await testEnvironment.ConnectClientAsync();
  641. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  642. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build());
  643. await c1.DisconnectAsync();
  644. var c2 = await testEnvironment.ConnectClientAsync();
  645. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  646. await Task.Delay(200);
  647. await c2.SubscribeAsync(new MqttTopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  648. await Task.Delay(500);
  649. Assert.AreEqual(0, receivedMessagesCount);
  650. }
  651. }
  652. [TestMethod]
  653. public async Task Clear_Retained_Message_With_Null_Payload()
  654. {
  655. using (var testEnvironment = new TestEnvironment(TestContext))
  656. {
  657. var receivedMessagesCount = 0;
  658. await testEnvironment.StartServerAsync();
  659. var c1 = await testEnvironment.ConnectClientAsync();
  660. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  661. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload((byte[])null).WithRetainFlag().Build());
  662. await c1.DisconnectAsync();
  663. var c2 = await testEnvironment.ConnectClientAsync();
  664. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  665. await Task.Delay(200);
  666. await c2.SubscribeAsync(new MqttTopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  667. await Task.Delay(500);
  668. Assert.AreEqual(0, receivedMessagesCount);
  669. }
  670. }
  671. [TestMethod]
  672. public async Task Intercept_Application_Message()
  673. {
  674. using (var testEnvironment = new TestEnvironment(TestContext))
  675. {
  676. await testEnvironment.StartServerAsync(
  677. new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(
  678. c => { c.ApplicationMessage = new MqttApplicationMessage { Topic = "new_topic" }; }));
  679. string receivedTopic = null;
  680. var c1 = await testEnvironment.ConnectClientAsync();
  681. await c1.SubscribeAsync("#");
  682. c1.UseApplicationMessageReceivedHandler(a => { receivedTopic = a.ApplicationMessage.Topic; });
  683. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("original_topic").Build());
  684. await Task.Delay(500);
  685. Assert.AreEqual("new_topic", receivedTopic);
  686. }
  687. }
  688. [TestMethod]
  689. public async Task Persist_Retained_Message()
  690. {
  691. var serverStorage = new TestServerStorage();
  692. using (var testEnvironment = new TestEnvironment(TestContext))
  693. {
  694. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithStorage(serverStorage));
  695. var c1 = await testEnvironment.ConnectClientAsync();
  696. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  697. await Task.Delay(500);
  698. Assert.AreEqual(1, serverStorage.Messages.Count);
  699. }
  700. }
  701. [TestMethod]
  702. public async Task Publish_After_Client_Connects()
  703. {
  704. using (var testEnvironment = new TestEnvironment(TestContext))
  705. {
  706. var server = await testEnvironment.StartServerAsync();
  707. server.UseClientConnectedHandler(async e =>
  708. {
  709. await server.PublishAsync("/test/1", "true", MqttQualityOfServiceLevel.ExactlyOnce, false);
  710. });
  711. string receivedTopic = null;
  712. var c1 = await testEnvironment.ConnectClientAsync();
  713. c1.UseApplicationMessageReceivedHandler(e => { receivedTopic = e.ApplicationMessage.Topic; });
  714. await c1.SubscribeAsync("#");
  715. await testEnvironment.ConnectClientAsync();
  716. await testEnvironment.ConnectClientAsync();
  717. await testEnvironment.ConnectClientAsync();
  718. await testEnvironment.ConnectClientAsync();
  719. await Task.Delay(500);
  720. Assert.AreEqual("/test/1", receivedTopic);
  721. }
  722. }
  723. [TestMethod]
  724. public async Task Intercept_Message()
  725. {
  726. void Interceptor(MqttApplicationMessageInterceptorContext context)
  727. {
  728. context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
  729. }
  730. using (var testEnvironment = new TestEnvironment(TestContext))
  731. {
  732. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(Interceptor));
  733. var c1 = await testEnvironment.ConnectClientAsync();
  734. var c2 = await testEnvironment.ConnectClientAsync();
  735. await c2.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("test").Build());
  736. var isIntercepted = false;
  737. c2.UseApplicationMessageReceivedHandler(c =>
  738. {
  739. isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(c.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
  740. });
  741. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("test").Build());
  742. await c1.DisconnectAsync();
  743. await Task.Delay(500);
  744. Assert.IsTrue(isIntercepted);
  745. }
  746. }
  747. [TestMethod]
  748. public async Task Send_Long_Body()
  749. {
  750. using (var testEnvironment = new TestEnvironment(TestContext))
  751. {
  752. const int PayloadSizeInMB = 30;
  753. const int CharCount = PayloadSizeInMB * 1024 * 1024;
  754. var longBody = new byte[CharCount];
  755. byte @char = 32;
  756. for (long i = 0; i < PayloadSizeInMB * 1024L * 1024L; i++)
  757. {
  758. longBody[i] = @char;
  759. @char++;
  760. if (@char > 126)
  761. {
  762. @char = 32;
  763. }
  764. }
  765. byte[] receivedBody = null;
  766. await testEnvironment.StartServerAsync();
  767. var client1 = await testEnvironment.ConnectClientAsync();
  768. client1.UseApplicationMessageReceivedHandler(c =>
  769. {
  770. receivedBody = c.ApplicationMessage.Payload;
  771. });
  772. await client1.SubscribeAsync("string");
  773. var client2 = await testEnvironment.ConnectClientAsync();
  774. await client2.PublishAsync("string", longBody);
  775. await Task.Delay(TimeSpan.FromSeconds(5));
  776. Assert.IsTrue(longBody.SequenceEqual(receivedBody ?? new byte[0]));
  777. }
  778. }
  779. [TestMethod]
  780. public async Task Deny_Connection()
  781. {
  782. var serverOptions = new MqttServerOptionsBuilder().WithConnectionValidator(context =>
  783. {
  784. context.ReasonCode = MqttConnectReasonCode.NotAuthorized;
  785. });
  786. using (var testEnvironment = new TestEnvironment(TestContext))
  787. {
  788. testEnvironment.IgnoreClientLogErrors = true;
  789. await testEnvironment.StartServerAsync(serverOptions);
  790. var connectingFailedException = await Assert.ThrowsExceptionAsync<MqttConnectingFailedException>(() => testEnvironment.ConnectClientAsync());
  791. Assert.AreEqual(MqttClientConnectResultCode.NotAuthorized, connectingFailedException.ResultCode);
  792. }
  793. }
  794. Dictionary<string, bool> _connected;
  795. private void ConnectionValidationHandler(MqttConnectionValidatorContext eventArgs)
  796. {
  797. if (_connected.ContainsKey(eventArgs.ClientId))
  798. {
  799. eventArgs.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  800. return;
  801. }
  802. _connected[eventArgs.ClientId] = true;
  803. eventArgs.ReasonCode = MqttConnectReasonCode.Success;
  804. return;
  805. }
  806. [TestMethod]
  807. public async Task Same_Client_Id_Refuse_Connection()
  808. {
  809. using (var testEnvironment = new TestEnvironment(TestContext))
  810. {
  811. testEnvironment.IgnoreClientLogErrors = true;
  812. _connected = new Dictionary<string, bool>();
  813. var options = new MqttServerOptionsBuilder();
  814. options.WithConnectionValidator(e => ConnectionValidationHandler(e));
  815. var server = await testEnvironment.StartServerAsync(options);
  816. var events = new List<string>();
  817. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ =>
  818. {
  819. lock (events)
  820. {
  821. events.Add("c");
  822. }
  823. });
  824. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ =>
  825. {
  826. lock (events)
  827. {
  828. events.Add("d");
  829. }
  830. });
  831. var clientOptions = new MqttClientOptionsBuilder()
  832. .WithClientId("same_id");
  833. // c
  834. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  835. c1.UseDisconnectedHandler(_ =>
  836. {
  837. lock (events)
  838. {
  839. events.Add("x");
  840. }
  841. });
  842. c1.UseApplicationMessageReceivedHandler(_ =>
  843. {
  844. lock (events)
  845. {
  846. events.Add("r");
  847. }
  848. });
  849. c1.SubscribeAsync("topic").Wait();
  850. await Task.Delay(500);
  851. c1.PublishAsync("topic").Wait();
  852. await Task.Delay(500);
  853. var flow = string.Join(string.Empty, events);
  854. Assert.AreEqual("cr", flow);
  855. try
  856. {
  857. await testEnvironment.ConnectClientAsync(clientOptions);
  858. Assert.Fail("same id connection is expected to fail");
  859. }
  860. catch
  861. {
  862. //same id connection is expected to fail
  863. }
  864. await Task.Delay(500);
  865. flow = string.Join(string.Empty, events);
  866. Assert.AreEqual("cr", flow);
  867. c1.PublishAsync("topic").Wait();
  868. await Task.Delay(500);
  869. flow = string.Join(string.Empty, events);
  870. Assert.AreEqual("crr", flow);
  871. }
  872. }
  873. [TestMethod]
  874. public async Task Same_Client_Id_Connect_Disconnect_Event_Order()
  875. {
  876. using (var testEnvironment = new TestEnvironment())
  877. {
  878. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  879. var events = new List<string>();
  880. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ =>
  881. {
  882. lock (events)
  883. {
  884. events.Add("c");
  885. }
  886. });
  887. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ =>
  888. {
  889. lock (events)
  890. {
  891. events.Add("d");
  892. }
  893. });
  894. var clientOptionsBuilder = new MqttClientOptionsBuilder()
  895. .WithClientId(Guid.NewGuid().ToString());
  896. // c
  897. var c1 = await testEnvironment.ConnectClientAsync(clientOptionsBuilder);
  898. await Task.Delay(500);
  899. var flow = string.Join(string.Empty, events);
  900. Assert.AreEqual("c", flow);
  901. // dc
  902. // Connect client with same client ID. Should disconnect existing client.
  903. var c2 = await testEnvironment.ConnectClientAsync(clientOptionsBuilder);
  904. await Task.Delay(500);
  905. flow = string.Join(string.Empty, events);
  906. Assert.AreEqual("cdc", flow);
  907. c2.UseApplicationMessageReceivedHandler(_ =>
  908. {
  909. lock (events)
  910. {
  911. events.Add("r");
  912. }
  913. });
  914. await c2.SubscribeAsync("topic");
  915. // r
  916. await c2.PublishAsync("topic");
  917. await Task.Delay(500);
  918. flow = string.Join(string.Empty, events);
  919. Assert.AreEqual("cdcr", flow);
  920. // nothing
  921. Assert.AreEqual(false, c1.IsConnected);
  922. await c1.DisconnectAsync();
  923. Assert.AreEqual(false, c1.IsConnected);
  924. await Task.Delay(500);
  925. // d
  926. Assert.AreEqual(true, c2.IsConnected);
  927. await c2.DisconnectAsync();
  928. await Task.Delay(500);
  929. await server.StopAsync();
  930. flow = string.Join(string.Empty, events);
  931. Assert.AreEqual("cdcrd", flow);
  932. }
  933. }
  934. [TestMethod]
  935. public async Task Remove_Session()
  936. {
  937. using (var testEnvironment = new TestEnvironment(TestContext))
  938. {
  939. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  940. var clientOptions = new MqttClientOptionsBuilder();
  941. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  942. await Task.Delay(500);
  943. Assert.AreEqual(1, (await server.GetClientStatusAsync()).Count);
  944. await c1.DisconnectAsync();
  945. await Task.Delay(500);
  946. Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count);
  947. }
  948. }
  949. [TestMethod]
  950. public async Task Stop_And_Restart()
  951. {
  952. using (var testEnvironment = new TestEnvironment(TestContext))
  953. {
  954. testEnvironment.IgnoreClientLogErrors = true;
  955. var server = await testEnvironment.StartServerAsync();
  956. await testEnvironment.ConnectClientAsync();
  957. await server.StopAsync();
  958. try
  959. {
  960. await testEnvironment.ConnectClientAsync();
  961. Assert.Fail("Connecting should fail.");
  962. }
  963. catch (Exception)
  964. {
  965. }
  966. await server.StartAsync(new MqttServerOptionsBuilder().WithDefaultEndpointPort(testEnvironment.ServerPort).Build());
  967. await testEnvironment.ConnectClientAsync();
  968. }
  969. }
  970. [TestMethod]
  971. public async Task Do_Not_Send_Retained_Messages_For_Denied_Subscription()
  972. {
  973. using (var testEnvironment = new TestEnvironment(TestContext))
  974. {
  975. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(c =>
  976. {
  977. // This should lead to no subscriptions for "n" at all. So also no sending of retained messages.
  978. if (c.TopicFilter.Topic == "n")
  979. {
  980. c.AcceptSubscription = false;
  981. }
  982. }));
  983. // Prepare some retained messages.
  984. var client1 = await testEnvironment.ConnectClientAsync();
  985. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("y").WithPayload("x").WithRetainFlag().Build());
  986. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("n").WithPayload("x").WithRetainFlag().Build());
  987. await client1.DisconnectAsync();
  988. await Task.Delay(500);
  989. // Subscribe to all retained message types.
  990. // It is important to do this in a range of filters to ensure that a subscription is not "hidden".
  991. var client2 = await testEnvironment.ConnectClientAsync();
  992. var buffer = new StringBuilder();
  993. client2.UseApplicationMessageReceivedHandler(c =>
  994. {
  995. lock (buffer)
  996. {
  997. buffer.Append(c.ApplicationMessage.Topic);
  998. }
  999. });
  1000. await client2.SubscribeAsync(new MqttTopicFilter { Topic = "y" }, new MqttTopicFilter { Topic = "n" });
  1001. await Task.Delay(500);
  1002. Assert.AreEqual("y", buffer.ToString());
  1003. }
  1004. }
  1005. [TestMethod]
  1006. public async Task Collect_Messages_In_Disconnected_Session()
  1007. {
  1008. using (var testEnvironment = new TestEnvironment(TestContext))
  1009. {
  1010. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions());
  1011. // Create the session including the subscription.
  1012. var client1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a"));
  1013. await client1.SubscribeAsync("x");
  1014. await client1.DisconnectAsync();
  1015. await Task.Delay(500);
  1016. var clientStatus = await server.GetClientStatusAsync();
  1017. Assert.AreEqual(0, clientStatus.Count);
  1018. var client2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("b"));
  1019. await client2.PublishAsync("x", "1");
  1020. await client2.PublishAsync("x", "2");
  1021. await client2.PublishAsync("x", "3");
  1022. await client2.DisconnectAsync();
  1023. await Task.Delay(500);
  1024. clientStatus = await server.GetClientStatusAsync();
  1025. var sessionStatus = await server.GetSessionStatusAsync();
  1026. Assert.AreEqual(0, clientStatus.Count);
  1027. Assert.AreEqual(2, sessionStatus.Count);
  1028. Assert.AreEqual(3, sessionStatus.First(s => s.ClientId == client1.Options.ClientId).PendingApplicationMessagesCount);
  1029. }
  1030. }
  1031. static async Task TestPublishAsync(
  1032. string topic,
  1033. MqttQualityOfServiceLevel qualityOfServiceLevel,
  1034. string topicFilter,
  1035. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  1036. int expectedReceivedMessagesCount,
  1037. TestContext testContext)
  1038. {
  1039. using (var testEnvironment = new TestEnvironment(testContext))
  1040. {
  1041. var receivedMessagesCount = 0;
  1042. await testEnvironment.StartServerAsync();
  1043. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("receiver"));
  1044. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  1045. await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());
  1046. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("sender"));
  1047. await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build());
  1048. await c2.DisconnectAsync().ConfigureAwait(false);
  1049. await Task.Delay(500);
  1050. await c1.UnsubscribeAsync(topicFilter);
  1051. await Task.Delay(500);
  1052. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  1053. }
  1054. }
  1055. [TestMethod]
  1056. public async Task Intercept_Undelivered()
  1057. {
  1058. using (var testEnvironment = new TestEnvironment())
  1059. {
  1060. var undeliverd = string.Empty;
  1061. var options = new MqttServerOptionsBuilder().WithUndeliveredMessageInterceptor(
  1062. context =>
  1063. {
  1064. undeliverd = context.ApplicationMessage.Topic;
  1065. });
  1066. await testEnvironment.StartServerAsync(options);
  1067. var client = await testEnvironment.ConnectClientAsync();
  1068. await client.SubscribeAsync("b");
  1069. await client.PublishAsync("a", null, MqttQualityOfServiceLevel.ExactlyOnce);
  1070. await Task.Delay(500);
  1071. Assert.AreEqual("a", undeliverd);
  1072. }
  1073. }
  1074. [TestMethod]
  1075. public async Task Intercept_ClientMessageQueue()
  1076. {
  1077. using (var testEnvironment = new TestEnvironment(TestContext))
  1078. {
  1079. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder()
  1080. .WithClientMessageQueueInterceptor(c => c.ApplicationMessage.Topic = "a"));
  1081. var topicAReceived = false;
  1082. var topicBReceived = false;
  1083. var client = await testEnvironment.ConnectClientAsync();
  1084. client.UseApplicationMessageReceivedHandler(c =>
  1085. {
  1086. if (c.ApplicationMessage.Topic == "a")
  1087. {
  1088. topicAReceived = true;
  1089. }
  1090. else if (c.ApplicationMessage.Topic == "b")
  1091. {
  1092. topicBReceived = true;
  1093. }
  1094. });
  1095. await client.SubscribeAsync("a");
  1096. await client.SubscribeAsync("b");
  1097. await client.PublishAsync("b");
  1098. await Task.Delay(500);
  1099. Assert.IsTrue(topicAReceived);
  1100. Assert.IsFalse(topicBReceived);
  1101. }
  1102. }
  1103. [TestMethod]
  1104. public async Task Intercept_ClientMessageQueue_Different_QoS_Of_Subscription_And_Message()
  1105. {
  1106. const string topic = "a";
  1107. using (var testEnvironment = new TestEnvironment(TestContext))
  1108. {
  1109. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder()
  1110. .WithClientMessageQueueInterceptor(c => { })); // Interceptor does nothing but has to be present.
  1111. bool receivedMessage = false;
  1112. var client = await testEnvironment.ConnectClientAsync();
  1113. client.UseApplicationMessageReceivedHandler(c =>
  1114. {
  1115. receivedMessage = true;
  1116. });
  1117. await client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce);
  1118. await client.PublishAsync(new MqttApplicationMessage { Topic = topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  1119. await Task.Delay(500);
  1120. Assert.IsTrue(receivedMessage);
  1121. }
  1122. }
  1123. }
  1124. }