You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1173 lines
43 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.Sockets;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using Microsoft.VisualStudio.TestTools.UnitTesting;
  9. using MQTTnet.Adapter;
  10. using MQTTnet.Client;
  11. using MQTTnet.Client.Connecting;
  12. using MQTTnet.Client.Disconnecting;
  13. using MQTTnet.Client.Options;
  14. using MQTTnet.Client.Receiving;
  15. using MQTTnet.Client.Subscribing;
  16. using MQTTnet.Protocol;
  17. using MQTTnet.Server;
  18. using MQTTnet.Tests.Mockups;
  19. namespace MQTTnet.Tests
  20. {
  21. [TestClass]
  22. public class Server_Tests
  23. {
  24. [TestMethod]
  25. public async Task Publish_At_Most_Once_0x00()
  26. {
  27. await TestPublishAsync(
  28. "A/B/C",
  29. MqttQualityOfServiceLevel.AtMostOnce,
  30. "A/B/C",
  31. MqttQualityOfServiceLevel.AtMostOnce,
  32. 1);
  33. }
  34. [TestMethod]
  35. public async Task Publish_At_Least_Once_0x01()
  36. {
  37. await TestPublishAsync(
  38. "A/B/C",
  39. MqttQualityOfServiceLevel.AtLeastOnce,
  40. "A/B/C",
  41. MqttQualityOfServiceLevel.AtLeastOnce,
  42. 1);
  43. }
  44. [TestMethod]
  45. public async Task Publish_Exactly_Once_0x02()
  46. {
  47. await TestPublishAsync(
  48. "A/B/C",
  49. MqttQualityOfServiceLevel.ExactlyOnce,
  50. "A/B/C",
  51. MqttQualityOfServiceLevel.ExactlyOnce,
  52. 1);
  53. }
  54. [TestMethod]
  55. public async Task Use_Clean_Session()
  56. {
  57. using (var testEnvironment = new TestEnvironment())
  58. {
  59. await testEnvironment.StartServerAsync();
  60. var client = testEnvironment.CreateClient();
  61. var connectResult = await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost", testEnvironment.ServerPort).WithCleanSession().Build());
  62. Assert.IsFalse(connectResult.IsSessionPresent);
  63. }
  64. }
  65. [TestMethod]
  66. public async Task Will_Message_Do_Not_Send()
  67. {
  68. using (var testEnvironment = new TestEnvironment())
  69. {
  70. var receivedMessagesCount = 0;
  71. await testEnvironment.StartServerAsync();
  72. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  73. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  74. var c1 = await testEnvironment.ConnectClientAsync();
  75. c1.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(c => Interlocked.Increment(ref receivedMessagesCount));
  76. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  77. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  78. await c2.DisconnectAsync().ConfigureAwait(false);
  79. await Task.Delay(1000);
  80. Assert.AreEqual(0, receivedMessagesCount);
  81. }
  82. }
  83. [TestMethod]
  84. public async Task Will_Message_Send()
  85. {
  86. using (var testEnvironment = new TestEnvironment())
  87. {
  88. var receivedMessagesCount = 0;
  89. await testEnvironment.StartServerAsync();
  90. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  91. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  92. var c1 = await testEnvironment.ConnectClientAsync();
  93. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  94. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  95. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  96. c2.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent.
  97. await Task.Delay(1000);
  98. Assert.AreEqual(1, receivedMessagesCount);
  99. }
  100. }
  101. [TestMethod]
  102. public async Task Intercept_Subscription()
  103. {
  104. using (var testEnvironment = new TestEnvironment())
  105. {
  106. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(
  107. c =>
  108. {
  109. // Set the topic to "a" regards what the client wants to subscribe.
  110. c.TopicFilter.Topic = "a";
  111. }));
  112. var topicAReceived = false;
  113. var topicBReceived = false;
  114. var client = await testEnvironment.ConnectClientAsync();
  115. client.UseApplicationMessageReceivedHandler(c =>
  116. {
  117. if (c.ApplicationMessage.Topic == "a")
  118. {
  119. topicAReceived = true;
  120. }
  121. else if (c.ApplicationMessage.Topic == "b")
  122. {
  123. topicBReceived = true;
  124. }
  125. });
  126. await client.SubscribeAsync("b");
  127. await client.PublishAsync("a");
  128. await Task.Delay(500);
  129. Assert.IsTrue(topicAReceived);
  130. Assert.IsFalse(topicBReceived);
  131. }
  132. }
  133. [TestMethod]
  134. public async Task Subscribe_Unsubscribe()
  135. {
  136. using (var testEnvironment = new TestEnvironment())
  137. {
  138. var receivedMessagesCount = 0;
  139. var server = await testEnvironment.StartServerAsync();
  140. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c1"));
  141. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  142. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c2"));
  143. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  144. await c2.PublishAsync(message);
  145. await Task.Delay(500);
  146. Assert.AreEqual(0, receivedMessagesCount);
  147. var subscribeEventCalled = false;
  148. server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(e =>
  149. {
  150. subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1";
  151. });
  152. await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  153. await Task.Delay(250);
  154. Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");
  155. await c2.PublishAsync(message);
  156. await Task.Delay(250);
  157. Assert.AreEqual(1, receivedMessagesCount);
  158. var unsubscribeEventCalled = false;
  159. server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e =>
  160. {
  161. unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1";
  162. });
  163. await c1.UnsubscribeAsync("a");
  164. await Task.Delay(250);
  165. Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");
  166. await c2.PublishAsync(message);
  167. await Task.Delay(500);
  168. Assert.AreEqual(1, receivedMessagesCount);
  169. await Task.Delay(500);
  170. Assert.AreEqual(1, receivedMessagesCount);
  171. }
  172. }
  173. [TestMethod]
  174. public async Task Subscribe_Multiple_In_Single_Request()
  175. {
  176. using (var testEnvironment = new TestEnvironment())
  177. {
  178. var receivedMessagesCount = 0;
  179. await testEnvironment.StartServerAsync();
  180. var c1 = await testEnvironment.ConnectClientAsync();
  181. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  182. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  183. .WithTopicFilter("a")
  184. .WithTopicFilter("b")
  185. .WithTopicFilter("c")
  186. .Build());
  187. var c2 = await testEnvironment.ConnectClientAsync();
  188. await c2.PublishAsync("a");
  189. await Task.Delay(100);
  190. Assert.AreEqual(receivedMessagesCount, 1);
  191. await c2.PublishAsync("b");
  192. await Task.Delay(100);
  193. Assert.AreEqual(receivedMessagesCount, 2);
  194. await c2.PublishAsync("c");
  195. await Task.Delay(100);
  196. Assert.AreEqual(receivedMessagesCount, 3);
  197. }
  198. }
  199. [TestMethod]
  200. public async Task Subscribe_Multiple_In_Multiple_Request()
  201. {
  202. using (var testEnvironment = new TestEnvironment())
  203. {
  204. var receivedMessagesCount = 0;
  205. await testEnvironment.StartServerAsync();
  206. var c1 = await testEnvironment.ConnectClientAsync();
  207. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  208. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  209. .WithTopicFilter("a")
  210. .Build());
  211. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  212. .WithTopicFilter("b")
  213. .Build());
  214. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  215. .WithTopicFilter("c")
  216. .Build());
  217. var c2 = await testEnvironment.ConnectClientAsync();
  218. await c2.PublishAsync("a");
  219. await Task.Delay(100);
  220. Assert.AreEqual(receivedMessagesCount, 1);
  221. await c2.PublishAsync("b");
  222. await Task.Delay(100);
  223. Assert.AreEqual(receivedMessagesCount, 2);
  224. await c2.PublishAsync("c");
  225. await Task.Delay(100);
  226. Assert.AreEqual(receivedMessagesCount, 3);
  227. }
  228. }
  229. [TestMethod]
  230. public async Task Publish_From_Server()
  231. {
  232. using (var testEnvironment = new TestEnvironment())
  233. {
  234. var server = await testEnvironment.StartServerAsync();
  235. var receivedMessagesCount = 0;
  236. var client = await testEnvironment.ConnectClientAsync();
  237. client.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  238. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  239. await client.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  240. await server.PublishAsync(message);
  241. await Task.Delay(1000);
  242. Assert.AreEqual(1, receivedMessagesCount);
  243. }
  244. }
  245. [TestMethod]
  246. public async Task Publish_Multiple_Clients()
  247. {
  248. var receivedMessagesCount = 0;
  249. var locked = new object();
  250. using (var testEnvironment = new TestEnvironment())
  251. {
  252. await testEnvironment.StartServerAsync();
  253. var c1 = await testEnvironment.ConnectClientAsync();
  254. var c2 = await testEnvironment.ConnectClientAsync();
  255. c1.UseApplicationMessageReceivedHandler(c =>
  256. {
  257. lock (locked)
  258. {
  259. receivedMessagesCount++;
  260. }
  261. });
  262. c2.UseApplicationMessageReceivedHandler(c =>
  263. {
  264. lock (locked)
  265. {
  266. receivedMessagesCount++;
  267. }
  268. });
  269. await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  270. await c2.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  271. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  272. for (var i = 0; i < 1000; i++)
  273. {
  274. await c1.PublishAsync(message);
  275. }
  276. await Task.Delay(500);
  277. Assert.AreEqual(2000, receivedMessagesCount);
  278. }
  279. }
  280. [TestMethod]
  281. public async Task Session_Takeover()
  282. {
  283. using (var testEnvironment = new TestEnvironment())
  284. {
  285. await testEnvironment.StartServerAsync();
  286. var options = new MqttClientOptionsBuilder()
  287. .WithCleanSession(false)
  288. .WithClientId("a");
  289. var client1 = await testEnvironment.ConnectClientAsync(options);
  290. await Task.Delay(500);
  291. var client2 = await testEnvironment.ConnectClientAsync(options);
  292. await Task.Delay(500);
  293. Assert.IsFalse(client1.IsConnected);
  294. Assert.IsTrue(client2.IsConnected);
  295. }
  296. }
  297. [TestMethod]
  298. public async Task No_Messages_If_No_Subscription()
  299. {
  300. using (var testEnvironment = new TestEnvironment())
  301. {
  302. await testEnvironment.StartServerAsync();
  303. var client = await testEnvironment.ConnectClientAsync();
  304. var receivedMessages = new List<MqttApplicationMessage>();
  305. client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e =>
  306. {
  307. await client.PublishAsync("Connected");
  308. });
  309. client.UseApplicationMessageReceivedHandler(c =>
  310. {
  311. lock (receivedMessages)
  312. {
  313. receivedMessages.Add(c.ApplicationMessage);
  314. }
  315. });
  316. await Task.Delay(500);
  317. await client.PublishAsync("Hello");
  318. await Task.Delay(500);
  319. Assert.AreEqual(0, receivedMessages.Count);
  320. }
  321. }
  322. [TestMethod]
  323. public async Task Set_Subscription_At_Server()
  324. {
  325. using (var testEnvironment = new TestEnvironment())
  326. {
  327. var server = await testEnvironment.StartServerAsync();
  328. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1"));
  329. var client = await testEnvironment.ConnectClientAsync();
  330. var receivedMessages = new List<MqttApplicationMessage>();
  331. client.UseApplicationMessageReceivedHandler(c =>
  332. {
  333. lock (receivedMessages)
  334. {
  335. receivedMessages.Add(c.ApplicationMessage);
  336. }
  337. });
  338. await Task.Delay(500);
  339. await client.PublishAsync("Hello");
  340. await Task.Delay(100);
  341. Assert.AreEqual(0, receivedMessages.Count);
  342. await client.PublishAsync("topic1");
  343. await Task.Delay(100);
  344. Assert.AreEqual(1, receivedMessages.Count);
  345. }
  346. }
  347. [TestMethod]
  348. public async Task Shutdown_Disconnects_Clients_Gracefully()
  349. {
  350. using (var testEnvironment = new TestEnvironment())
  351. {
  352. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  353. var disconnectCalled = 0;
  354. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  355. c1.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e => disconnectCalled++);
  356. await Task.Delay(100);
  357. await server.StopAsync();
  358. await Task.Delay(100);
  359. Assert.AreEqual(1, disconnectCalled);
  360. }
  361. }
  362. [TestMethod]
  363. public async Task Handle_Clean_Disconnect()
  364. {
  365. using (var testEnvironment = new TestEnvironment())
  366. {
  367. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  368. var clientConnectedCalled = 0;
  369. var clientDisconnectedCalled = 0;
  370. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => Interlocked.Increment(ref clientConnectedCalled));
  371. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ => Interlocked.Increment(ref clientDisconnectedCalled));
  372. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  373. Assert.AreEqual(1, clientConnectedCalled);
  374. Assert.AreEqual(0, clientDisconnectedCalled);
  375. await Task.Delay(500);
  376. await c1.DisconnectAsync();
  377. await Task.Delay(500);
  378. Assert.AreEqual(1, clientConnectedCalled);
  379. Assert.AreEqual(1, clientDisconnectedCalled);
  380. }
  381. }
  382. [TestMethod]
  383. public async Task Client_Disconnect_Without_Errors()
  384. {
  385. using (var testEnvironment = new TestEnvironment())
  386. {
  387. bool clientWasConnected;
  388. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  389. try
  390. {
  391. var client = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  392. clientWasConnected = true;
  393. await client.DisconnectAsync();
  394. await Task.Delay(500);
  395. }
  396. finally
  397. {
  398. await server.StopAsync();
  399. }
  400. Assert.IsTrue(clientWasConnected);
  401. testEnvironment.ThrowIfLogErrors();
  402. }
  403. }
  404. [TestMethod]
  405. public async Task Handle_Lots_Of_Parallel_Retained_Messages()
  406. {
  407. const int ClientCount = 50;
  408. using (var testEnvironment = new TestEnvironment())
  409. {
  410. var server = await testEnvironment.StartServerAsync();
  411. var tasks = new List<Task>();
  412. for (var i = 0; i < ClientCount; i++)
  413. {
  414. var i2 = i;
  415. var testEnvironment2 = testEnvironment;
  416. tasks.Add(Task.Run(async () =>
  417. {
  418. try
  419. {
  420. using (var client = await testEnvironment2.ConnectClientAsync())
  421. {
  422. // Clear retained message.
  423. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  424. .WithPayload(new byte[0]).WithRetainFlag().Build());
  425. // Set retained message.
  426. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  427. .WithPayload("value").WithRetainFlag().Build());
  428. await client.DisconnectAsync();
  429. }
  430. }
  431. catch (Exception exception)
  432. {
  433. testEnvironment2.TrackException(exception);
  434. }
  435. }));
  436. }
  437. await Task.WhenAll(tasks);
  438. await Task.Delay(1000);
  439. var retainedMessages = await server.GetRetainedApplicationMessagesAsync();
  440. Assert.AreEqual(ClientCount, retainedMessages.Count);
  441. for (var i = 0; i < ClientCount; i++)
  442. {
  443. Assert.IsTrue(retainedMessages.Any(m => m.Topic == "r" + i));
  444. }
  445. }
  446. }
  447. [TestMethod]
  448. public async Task Retained_Messages_Flow()
  449. {
  450. using (var testEnvironment = new TestEnvironment())
  451. {
  452. var retainedMessage = new MqttApplicationMessageBuilder().WithTopic("r").WithPayload("r").WithRetainFlag().Build();
  453. await testEnvironment.StartServerAsync();
  454. var c1 = await testEnvironment.ConnectClientAsync();
  455. var receivedMessages = 0;
  456. var c2 = await testEnvironment.ConnectClientAsync();
  457. c2.UseApplicationMessageReceivedHandler(c =>
  458. {
  459. Interlocked.Increment(ref receivedMessages);
  460. });
  461. await c1.PublishAsync(retainedMessage);
  462. await c1.DisconnectAsync();
  463. await Task.Delay(500);
  464. for (var i = 0; i < 5; i++)
  465. {
  466. await c2.UnsubscribeAsync("r");
  467. await Task.Delay(100);
  468. Assert.AreEqual(i, receivedMessages);
  469. await c2.SubscribeAsync("r");
  470. await Task.Delay(100);
  471. Assert.AreEqual(i + 1, receivedMessages);
  472. }
  473. await c2.DisconnectAsync();
  474. }
  475. }
  476. [TestMethod]
  477. public async Task Receive_No_Retained_Message_After_Subscribe()
  478. {
  479. using (var testEnvironment = new TestEnvironment())
  480. {
  481. await testEnvironment.StartServerAsync();
  482. var c1 = await testEnvironment.ConnectClientAsync();
  483. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  484. await c1.DisconnectAsync();
  485. var receivedMessagesCount = 0;
  486. var c2 = await testEnvironment.ConnectClientAsync();
  487. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  488. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained_other").Build());
  489. await Task.Delay(500);
  490. Assert.AreEqual(0, receivedMessagesCount);
  491. }
  492. }
  493. [TestMethod]
  494. public async Task Receive_Retained_Message_After_Subscribe()
  495. {
  496. using (var testEnvironment = new TestEnvironment())
  497. {
  498. await testEnvironment.StartServerAsync();
  499. var c1 = await testEnvironment.ConnectClientAsync();
  500. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  501. await c1.DisconnectAsync();
  502. var receivedMessages = new List<MqttApplicationMessage>();
  503. var c2 = await testEnvironment.ConnectClientAsync();
  504. c2.UseApplicationMessageReceivedHandler(c =>
  505. {
  506. lock (receivedMessages)
  507. {
  508. receivedMessages.Add(c.ApplicationMessage);
  509. }
  510. });
  511. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  512. await Task.Delay(500);
  513. Assert.AreEqual(1, receivedMessages.Count);
  514. Assert.IsTrue(receivedMessages.First().Retain);
  515. }
  516. }
  517. [TestMethod]
  518. public async Task Clear_Retained_Message_With_Empty_Payload()
  519. {
  520. using (var testEnvironment = new TestEnvironment())
  521. {
  522. var receivedMessagesCount = 0;
  523. await testEnvironment.StartServerAsync();
  524. var c1 = await testEnvironment.ConnectClientAsync();
  525. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  526. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build());
  527. await c1.DisconnectAsync();
  528. var c2 = await testEnvironment.ConnectClientAsync();
  529. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  530. await Task.Delay(200);
  531. await c2.SubscribeAsync(new TopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  532. await Task.Delay(500);
  533. Assert.AreEqual(0, receivedMessagesCount);
  534. }
  535. }
  536. [TestMethod]
  537. public async Task Clear_Retained_Message_With_Null_Payload()
  538. {
  539. using (var testEnvironment = new TestEnvironment())
  540. {
  541. var receivedMessagesCount = 0;
  542. await testEnvironment.StartServerAsync();
  543. var c1 = await testEnvironment.ConnectClientAsync();
  544. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  545. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload((byte[])null).WithRetainFlag().Build());
  546. await c1.DisconnectAsync();
  547. var c2 = await testEnvironment.ConnectClientAsync();
  548. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  549. await Task.Delay(200);
  550. await c2.SubscribeAsync(new TopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  551. await Task.Delay(500);
  552. Assert.AreEqual(0, receivedMessagesCount);
  553. }
  554. }
  555. [TestMethod]
  556. public async Task Intercept_Application_Message()
  557. {
  558. using (var testEnvironment = new TestEnvironment())
  559. {
  560. await testEnvironment.StartServerAsync(
  561. new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(
  562. c => { c.ApplicationMessage = new MqttApplicationMessage { Topic = "new_topic" }; }));
  563. string receivedTopic = null;
  564. var c1 = await testEnvironment.ConnectClientAsync();
  565. await c1.SubscribeAsync("#");
  566. c1.UseApplicationMessageReceivedHandler(a => { receivedTopic = a.ApplicationMessage.Topic; });
  567. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("original_topic").Build());
  568. await Task.Delay(500);
  569. Assert.AreEqual("new_topic", receivedTopic);
  570. }
  571. }
  572. [TestMethod]
  573. public async Task Persist_Retained_Message()
  574. {
  575. var serverStorage = new TestServerStorage();
  576. using (var testEnvironment = new TestEnvironment())
  577. {
  578. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithStorage(serverStorage));
  579. var c1 = await testEnvironment.ConnectClientAsync();
  580. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  581. await Task.Delay(500);
  582. Assert.AreEqual(1, serverStorage.Messages.Count);
  583. }
  584. }
  585. [TestMethod]
  586. public async Task Publish_After_Client_Connects()
  587. {
  588. using (var testEnvironment = new TestEnvironment())
  589. {
  590. var server = await testEnvironment.StartServerAsync();
  591. server.UseClientConnectedHandler(async e =>
  592. {
  593. await server.PublishAsync("/test/1", "true", MqttQualityOfServiceLevel.ExactlyOnce, false);
  594. });
  595. string receivedTopic = null;
  596. var c1 = await testEnvironment.ConnectClientAsync();
  597. c1.UseApplicationMessageReceivedHandler(e => { receivedTopic = e.ApplicationMessage.Topic; });
  598. await c1.SubscribeAsync("#");
  599. await testEnvironment.ConnectClientAsync();
  600. await testEnvironment.ConnectClientAsync();
  601. await testEnvironment.ConnectClientAsync();
  602. await testEnvironment.ConnectClientAsync();
  603. await Task.Delay(500);
  604. Assert.AreEqual("/test/1", receivedTopic);
  605. }
  606. }
  607. [TestMethod]
  608. public async Task Intercept_Message()
  609. {
  610. void Interceptor(MqttApplicationMessageInterceptorContext context)
  611. {
  612. context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
  613. }
  614. using (var testEnvironment = new TestEnvironment())
  615. {
  616. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(Interceptor));
  617. var c1 = await testEnvironment.ConnectClientAsync();
  618. var c2 = await testEnvironment.ConnectClientAsync();
  619. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build());
  620. var isIntercepted = false;
  621. c2.UseApplicationMessageReceivedHandler(c =>
  622. {
  623. isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(c.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
  624. });
  625. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("test").Build());
  626. await c1.DisconnectAsync();
  627. await Task.Delay(500);
  628. Assert.IsTrue(isIntercepted);
  629. }
  630. }
  631. [TestMethod]
  632. public async Task Send_Long_Body()
  633. {
  634. using (var testEnvironment = new TestEnvironment())
  635. {
  636. const int PayloadSizeInMB = 30;
  637. const int CharCount = PayloadSizeInMB * 1024 * 1024;
  638. var longBody = new byte[CharCount];
  639. byte @char = 32;
  640. for (long i = 0; i < PayloadSizeInMB * 1024L * 1024L; i++)
  641. {
  642. longBody[i] = @char;
  643. @char++;
  644. if (@char > 126)
  645. {
  646. @char = 32;
  647. }
  648. }
  649. byte[] receivedBody = null;
  650. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  651. var client1 = await testEnvironment.ConnectClientAsync();
  652. client1.UseApplicationMessageReceivedHandler(c =>
  653. {
  654. receivedBody = c.ApplicationMessage.Payload;
  655. });
  656. await client1.SubscribeAsync("string");
  657. var client2 = await testEnvironment.ConnectClientAsync();
  658. await client2.PublishAsync("string", longBody);
  659. await Task.Delay(500);
  660. Assert.IsTrue(longBody.SequenceEqual(receivedBody ?? new byte[0]));
  661. }
  662. }
  663. [TestMethod]
  664. public async Task Deny_Connection()
  665. {
  666. var serverOptions = new MqttServerOptionsBuilder().WithConnectionValidator(context =>
  667. {
  668. context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized;
  669. });
  670. using (var testEnvironment = new TestEnvironment())
  671. {
  672. testEnvironment.IgnoreClientLogErrors = true;
  673. await testEnvironment.StartServerAsync(serverOptions);
  674. try
  675. {
  676. await testEnvironment.ConnectClientAsync();
  677. Assert.Fail("An exception should be raised.");
  678. }
  679. catch (Exception exception)
  680. {
  681. if (exception is MqttConnectingFailedException connectingFailedException)
  682. {
  683. Assert.AreEqual(MqttClientConnectResultCode.NotAuthorized, connectingFailedException.ResultCode);
  684. }
  685. else
  686. {
  687. Assert.Fail("Wrong exception.");
  688. }
  689. }
  690. }
  691. }
  692. [TestMethod]
  693. public async Task Same_Client_Id_Connect_Disconnect_Event_Order()
  694. {
  695. using (var testEnvironment = new TestEnvironment())
  696. {
  697. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  698. var events = new List<string>();
  699. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ =>
  700. {
  701. lock (events)
  702. {
  703. events.Add("c");
  704. }
  705. });
  706. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ =>
  707. {
  708. lock (events)
  709. {
  710. events.Add("d");
  711. }
  712. });
  713. var clientOptions = new MqttClientOptionsBuilder()
  714. .WithClientId("same_id");
  715. // c
  716. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  717. await Task.Delay(500);
  718. var flow = string.Join(string.Empty, events);
  719. Assert.AreEqual("c", flow);
  720. // dc
  721. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  722. await Task.Delay(500);
  723. flow = string.Join(string.Empty, events);
  724. Assert.AreEqual("cdc", flow);
  725. // nothing
  726. await c1.DisconnectAsync();
  727. await Task.Delay(500);
  728. // d
  729. await c2.DisconnectAsync();
  730. await Task.Delay(500);
  731. await server.StopAsync();
  732. flow = string.Join(string.Empty, events);
  733. Assert.AreEqual("cdcd", flow);
  734. }
  735. }
  736. [TestMethod]
  737. public async Task Remove_Session()
  738. {
  739. using (var testEnvironment = new TestEnvironment())
  740. {
  741. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  742. var clientOptions = new MqttClientOptionsBuilder();
  743. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  744. await Task.Delay(500);
  745. Assert.AreEqual(1, (await server.GetClientStatusAsync()).Count);
  746. await c1.DisconnectAsync();
  747. await Task.Delay(500);
  748. Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count);
  749. }
  750. }
  751. [TestMethod]
  752. public async Task Stop_And_Restart()
  753. {
  754. using (var testEnvironment = new TestEnvironment())
  755. {
  756. testEnvironment.IgnoreClientLogErrors = true;
  757. var server = await testEnvironment.StartServerAsync();
  758. await testEnvironment.ConnectClientAsync();
  759. await server.StopAsync();
  760. try
  761. {
  762. await testEnvironment.ConnectClientAsync();
  763. Assert.Fail("Connecting should fail.");
  764. }
  765. catch (Exception)
  766. {
  767. }
  768. await server.StartAsync(new MqttServerOptionsBuilder().WithDefaultEndpointPort(testEnvironment.ServerPort).Build());
  769. await testEnvironment.ConnectClientAsync();
  770. }
  771. }
  772. [TestMethod]
  773. public async Task Close_Idle_Connection()
  774. {
  775. using (var testEnvironment = new TestEnvironment())
  776. {
  777. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));
  778. var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  779. await client.ConnectAsync("localhost", testEnvironment.ServerPort);
  780. // Don't send anything. The server should close the connection.
  781. await Task.Delay(TimeSpan.FromSeconds(3));
  782. try
  783. {
  784. var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
  785. if (receivedBytes == 0)
  786. {
  787. return;
  788. }
  789. Assert.Fail("Receive should throw an exception.");
  790. }
  791. catch (SocketException)
  792. {
  793. }
  794. }
  795. }
  796. [TestMethod]
  797. public async Task Send_Garbage()
  798. {
  799. using (var testEnvironment = new TestEnvironment())
  800. {
  801. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));
  802. // Send an invalid packet and ensure that the server will close the connection and stay in a waiting state
  803. // forever. This is security related.
  804. var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  805. await client.ConnectAsync("localhost", testEnvironment.ServerPort);
  806. await client.SendAsync(Encoding.UTF8.GetBytes("Garbage"), SocketFlags.None);
  807. await Task.Delay(TimeSpan.FromSeconds(3));
  808. try
  809. {
  810. var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
  811. if (receivedBytes == 0)
  812. {
  813. return;
  814. }
  815. Assert.Fail("Receive should throw an exception.");
  816. }
  817. catch (SocketException)
  818. {
  819. }
  820. }
  821. }
  822. [TestMethod]
  823. public async Task Do_Not_Send_Retained_Messages_For_Denied_Subscription()
  824. {
  825. using (var testEnvironment = new TestEnvironment())
  826. {
  827. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(c =>
  828. {
  829. // This should lead to no subscriptions for "n" at all. So also no sending of retained messages.
  830. if (c.TopicFilter.Topic == "n")
  831. {
  832. c.AcceptSubscription = false;
  833. }
  834. }));
  835. // Prepare some retained messages.
  836. var client1 = await testEnvironment.ConnectClientAsync();
  837. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("y").WithPayload("x").WithRetainFlag().Build());
  838. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("n").WithPayload("x").WithRetainFlag().Build());
  839. await client1.DisconnectAsync();
  840. await Task.Delay(500);
  841. // Subscribe to all retained message types.
  842. // It is important to do this in a range of filters to ensure that a subscription is not "hidden".
  843. var client2 = await testEnvironment.ConnectClientAsync();
  844. var buffer = new StringBuilder();
  845. client2.UseApplicationMessageReceivedHandler(c =>
  846. {
  847. lock (buffer)
  848. {
  849. buffer.Append(c.ApplicationMessage.Topic);
  850. }
  851. });
  852. await client2.SubscribeAsync(new TopicFilter { Topic = "y" }, new TopicFilter { Topic = "n" });
  853. await Task.Delay(500);
  854. Assert.AreEqual("y", buffer.ToString());
  855. }
  856. }
  857. [TestMethod]
  858. public async Task Collect_Messages_In_Disconnected_Session()
  859. {
  860. using (var testEnvironment = new TestEnvironment())
  861. {
  862. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions());
  863. // Create the session including the subscription.
  864. var client1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a"));
  865. await client1.SubscribeAsync("x");
  866. await client1.DisconnectAsync();
  867. await Task.Delay(500);
  868. var clientStatus = await server.GetClientStatusAsync();
  869. Assert.AreEqual(0, clientStatus.Count);
  870. var client2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("b"));
  871. await client2.PublishAsync("x", "1");
  872. await client2.PublishAsync("x", "2");
  873. await client2.PublishAsync("x", "3");
  874. await client2.DisconnectAsync();
  875. await Task.Delay(500);
  876. clientStatus = await server.GetClientStatusAsync();
  877. var sessionStatus = await server.GetSessionStatusAsync();
  878. Assert.AreEqual(0, clientStatus.Count);
  879. Assert.AreEqual(2, sessionStatus.Count);
  880. Assert.AreEqual(3, sessionStatus.First(s => s.ClientId == "a").PendingApplicationMessagesCount);
  881. }
  882. }
  883. private static async Task TestPublishAsync(
  884. string topic,
  885. MqttQualityOfServiceLevel qualityOfServiceLevel,
  886. string topicFilter,
  887. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  888. int expectedReceivedMessagesCount)
  889. {
  890. using (var testEnvironment = new TestEnvironment())
  891. {
  892. var receivedMessagesCount = 0;
  893. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  894. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("receiver"));
  895. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  896. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());
  897. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("sender"));
  898. await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build());
  899. await c2.DisconnectAsync().ConfigureAwait(false);
  900. await Task.Delay(500);
  901. await c1.UnsubscribeAsync(topicFilter);
  902. await Task.Delay(500);
  903. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  904. }
  905. }
  906. }
  907. }