Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.
 
 
 
 

971 řádky
35 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.Sockets;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using Microsoft.VisualStudio.TestTools.UnitTesting;
  9. using MQTTnet.Adapter;
  10. using MQTTnet.Client;
  11. using MQTTnet.Client.Connecting;
  12. using MQTTnet.Client.Options;
  13. using MQTTnet.Protocol;
  14. using MQTTnet.Server;
  15. using MQTTnet.Tests.Mockups;
  16. namespace MQTTnet.Tests
  17. {
  18. [TestClass]
  19. public class Server_Tests
  20. {
  21. [TestMethod]
  22. public async Task Publish_At_Most_Once_0x00()
  23. {
  24. await TestPublishAsync(
  25. "A/B/C",
  26. MqttQualityOfServiceLevel.AtMostOnce,
  27. "A/B/C",
  28. MqttQualityOfServiceLevel.AtMostOnce,
  29. 1);
  30. }
  31. [TestMethod]
  32. public async Task Publish_At_Least_Once_0x01()
  33. {
  34. await TestPublishAsync(
  35. "A/B/C",
  36. MqttQualityOfServiceLevel.AtLeastOnce,
  37. "A/B/C",
  38. MqttQualityOfServiceLevel.AtLeastOnce,
  39. 1);
  40. }
  41. [TestMethod]
  42. public async Task Publish_Exactly_Once_0x02()
  43. {
  44. await TestPublishAsync(
  45. "A/B/C",
  46. MqttQualityOfServiceLevel.ExactlyOnce,
  47. "A/B/C",
  48. MqttQualityOfServiceLevel.ExactlyOnce,
  49. 1);
  50. }
  51. [TestMethod]
  52. public async Task Will_Message_Do_Not_Send()
  53. {
  54. using (var testEnvironment = new TestEnvironment())
  55. {
  56. var receivedMessagesCount = 0;
  57. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  58. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  59. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  60. var c1 = await testEnvironment.ConnectClientAsync();
  61. c1.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  62. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  63. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  64. await c2.DisconnectAsync().ConfigureAwait(false);
  65. await Task.Delay(1000);
  66. Assert.AreEqual(0, receivedMessagesCount);
  67. }
  68. }
  69. [TestMethod]
  70. public async Task Will_Message_Send()
  71. {
  72. using (var testEnvironment = new TestEnvironment())
  73. {
  74. var receivedMessagesCount = 0;
  75. await testEnvironment.StartServerAsync();
  76. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  77. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  78. var c1 = await testEnvironment.ConnectClientAsync();
  79. c1.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  80. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  81. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  82. c2.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent.
  83. await Task.Delay(1000);
  84. Assert.AreEqual(1, receivedMessagesCount);
  85. }
  86. }
  87. [TestMethod]
  88. public async Task Subscribe_Unsubscribe()
  89. {
  90. using (var testEnvironment = new TestEnvironment())
  91. {
  92. var receivedMessagesCount = 0;
  93. var server = await testEnvironment.StartServerAsync();
  94. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c1"));
  95. c1.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  96. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c2"));
  97. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  98. await c2.PublishAsync(message);
  99. await Task.Delay(500);
  100. Assert.AreEqual(0, receivedMessagesCount);
  101. var subscribeEventCalled = false;
  102. server.ClientSubscribedTopic += (_, e) =>
  103. {
  104. subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1";
  105. };
  106. await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  107. await Task.Delay(250);
  108. Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");
  109. await c2.PublishAsync(message);
  110. await Task.Delay(250);
  111. Assert.AreEqual(1, receivedMessagesCount);
  112. var unsubscribeEventCalled = false;
  113. server.ClientUnsubscribedTopic += (_, e) =>
  114. {
  115. unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1";
  116. };
  117. await c1.UnsubscribeAsync("a");
  118. await Task.Delay(250);
  119. Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");
  120. await c2.PublishAsync(message);
  121. await Task.Delay(500);
  122. Assert.AreEqual(1, receivedMessagesCount);
  123. await Task.Delay(500);
  124. Assert.AreEqual(1, receivedMessagesCount);
  125. }
  126. }
  127. [TestMethod]
  128. public async Task Publish_From_Server()
  129. {
  130. using (var testEnvironment = new TestEnvironment())
  131. {
  132. var server = await testEnvironment.StartServerAsync();
  133. var receivedMessagesCount = 0;
  134. var client = await testEnvironment.ConnectClientAsync();
  135. client.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  136. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  137. await client.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  138. await server.PublishAsync(message);
  139. await Task.Delay(1000);
  140. Assert.AreEqual(1, receivedMessagesCount);
  141. }
  142. }
  143. [TestMethod]
  144. public async Task Publish_Multiple_Clients()
  145. {
  146. var receivedMessagesCount = 0;
  147. var locked = new object();
  148. using (var testEnvironment = new TestEnvironment())
  149. {
  150. await testEnvironment.StartServerAsync();
  151. var c1 = await testEnvironment.ConnectClientAsync();
  152. var c2 = await testEnvironment.ConnectClientAsync();
  153. c1.UseReceivedApplicationMessageHandler(c =>
  154. {
  155. lock (locked)
  156. {
  157. receivedMessagesCount++;
  158. }
  159. });
  160. c2.UseReceivedApplicationMessageHandler(c =>
  161. {
  162. lock (locked)
  163. {
  164. receivedMessagesCount++;
  165. }
  166. });
  167. await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  168. await c2.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  169. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  170. for (var i = 0; i < 1000; i++)
  171. {
  172. await c1.PublishAsync(message);
  173. }
  174. await Task.Delay(500);
  175. Assert.AreEqual(2000, receivedMessagesCount);
  176. }
  177. }
  178. [TestMethod]
  179. public async Task Session_Takeover()
  180. {
  181. using (var testEnvironment = new TestEnvironment())
  182. {
  183. await testEnvironment.StartServerAsync();
  184. var options = new MqttClientOptionsBuilder()
  185. .WithCleanSession(false)
  186. .WithClientId("a");
  187. var client1 = await testEnvironment.ConnectClientAsync(options);
  188. await Task.Delay(500);
  189. var client2 = await testEnvironment.ConnectClientAsync(options);
  190. await Task.Delay(500);
  191. Assert.IsFalse(client1.IsConnected);
  192. Assert.IsTrue(client2.IsConnected);
  193. }
  194. }
  195. [TestMethod]
  196. public async Task No_Messages_If_No_Subscription()
  197. {
  198. using (var testEnvironment = new TestEnvironment())
  199. {
  200. await testEnvironment.StartServerAsync();
  201. var client = await testEnvironment.ConnectClientAsync();
  202. var receivedMessages = new List<MqttApplicationMessage>();
  203. client.Connected += async (s, e) =>
  204. {
  205. await client.PublishAsync("Connected");
  206. };
  207. client.UseReceivedApplicationMessageHandler(c =>
  208. {
  209. lock (receivedMessages)
  210. {
  211. receivedMessages.Add(c.ApplicationMessage);
  212. }
  213. });
  214. await Task.Delay(500);
  215. await client.PublishAsync("Hello");
  216. await Task.Delay(500);
  217. Assert.AreEqual(0, receivedMessages.Count);
  218. }
  219. }
  220. [TestMethod]
  221. public async Task Set_Subscription_At_Server()
  222. {
  223. using (var testEnvironment = new TestEnvironment())
  224. {
  225. var server = await testEnvironment.StartServerAsync();
  226. server.ClientConnected += async (s, e) =>
  227. {
  228. await server.SubscribeAsync(e.ClientId, "topic1");
  229. };
  230. var client = await testEnvironment.ConnectClientAsync();
  231. var receivedMessages = new List<MqttApplicationMessage>();
  232. client.UseReceivedApplicationMessageHandler(c =>
  233. {
  234. lock (receivedMessages)
  235. {
  236. receivedMessages.Add(c.ApplicationMessage);
  237. }
  238. });
  239. await Task.Delay(500);
  240. await client.PublishAsync("Hello");
  241. await Task.Delay(100);
  242. Assert.AreEqual(0, receivedMessages.Count);
  243. await client.PublishAsync("topic1");
  244. await Task.Delay(100);
  245. Assert.AreEqual(1, receivedMessages.Count);
  246. }
  247. }
  248. [TestMethod]
  249. public async Task Shutdown_Disconnects_Clients_Gracefully()
  250. {
  251. using (var testEnvironment = new TestEnvironment())
  252. {
  253. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  254. var disconnectCalled = 0;
  255. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  256. c1.Disconnected += (sender, args) => disconnectCalled++;
  257. await Task.Delay(100);
  258. await server.StopAsync();
  259. await Task.Delay(100);
  260. Assert.AreEqual(1, disconnectCalled);
  261. }
  262. }
  263. [TestMethod]
  264. public async Task Handle_Clean_Disconnect()
  265. {
  266. using (var testEnvironment = new TestEnvironment())
  267. {
  268. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  269. var clientConnectedCalled = 0;
  270. var clientDisconnectedCalled = 0;
  271. server.ClientConnected += (_, __) => Interlocked.Increment(ref clientConnectedCalled);
  272. server.ClientDisconnected += (_, __) => Interlocked.Increment(ref clientDisconnectedCalled);
  273. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  274. Assert.AreEqual(1, clientConnectedCalled);
  275. Assert.AreEqual(0, clientDisconnectedCalled);
  276. await Task.Delay(500);
  277. await c1.DisconnectAsync();
  278. await Task.Delay(500);
  279. Assert.AreEqual(1, clientConnectedCalled);
  280. Assert.AreEqual(1, clientDisconnectedCalled);
  281. }
  282. }
  283. [TestMethod]
  284. public async Task Client_Disconnect_Without_Errors()
  285. {
  286. using (var testEnvironment = new TestEnvironment())
  287. {
  288. bool clientWasConnected;
  289. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  290. try
  291. {
  292. var client = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  293. clientWasConnected = true;
  294. await client.DisconnectAsync();
  295. await Task.Delay(500);
  296. }
  297. finally
  298. {
  299. await server.StopAsync();
  300. }
  301. Assert.IsTrue(clientWasConnected);
  302. testEnvironment.ThrowIfLogErrors();
  303. }
  304. }
  305. [TestMethod]
  306. public async Task Handle_Lots_Of_Parallel_Retained_Messages()
  307. {
  308. const int ClientCount = 50;
  309. using (var testEnvironment = new TestEnvironment())
  310. {
  311. var server = await testEnvironment.StartServerAsync();
  312. var tasks = new List<Task>();
  313. for (var i = 0; i < ClientCount; i++)
  314. {
  315. var i2 = i;
  316. var testEnvironment2 = testEnvironment;
  317. tasks.Add(Task.Run(async () =>
  318. {
  319. try
  320. {
  321. using (var client = await testEnvironment2.ConnectClientAsync())
  322. {
  323. // Clear retained message.
  324. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  325. .WithPayload(new byte[0]).WithRetainFlag().Build());
  326. // Set retained message.
  327. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  328. .WithPayload("value").WithRetainFlag().Build());
  329. await client.DisconnectAsync();
  330. }
  331. }
  332. catch (Exception exception)
  333. {
  334. testEnvironment2.TrackException(exception);
  335. }
  336. }));
  337. }
  338. await Task.WhenAll(tasks);
  339. await Task.Delay(1000);
  340. var retainedMessages = await server.GetRetainedMessagesAsync();
  341. Assert.AreEqual(ClientCount, retainedMessages.Count);
  342. for (var i = 0; i < ClientCount; i++)
  343. {
  344. Assert.IsTrue(retainedMessages.Any(m => m.Topic == "r" + i));
  345. }
  346. }
  347. }
  348. [TestMethod]
  349. public async Task Retained_Messages_Flow()
  350. {
  351. using (var testEnvironment = new TestEnvironment())
  352. {
  353. var retainedMessage = new MqttApplicationMessageBuilder().WithTopic("r").WithPayload("r").WithRetainFlag().Build();
  354. await testEnvironment.StartServerAsync();
  355. var c1 = await testEnvironment.ConnectClientAsync();
  356. var receivedMessages = 0;
  357. var c2 = await testEnvironment.ConnectClientAsync();
  358. c2.UseReceivedApplicationMessageHandler(c =>
  359. {
  360. Interlocked.Increment(ref receivedMessages);
  361. });
  362. await c1.PublishAsync(retainedMessage);
  363. await c1.DisconnectAsync();
  364. await Task.Delay(500);
  365. for (var i = 0; i < 5; i++)
  366. {
  367. await c2.UnsubscribeAsync("r");
  368. await Task.Delay(100);
  369. Assert.AreEqual(i, receivedMessages);
  370. await c2.SubscribeAsync("r");
  371. await Task.Delay(100);
  372. Assert.AreEqual(i + 1, receivedMessages);
  373. }
  374. await c2.DisconnectAsync();
  375. }
  376. }
  377. [TestMethod]
  378. public async Task Receive_No_Retained_Message_After_Subscribe()
  379. {
  380. using (var testEnvironment = new TestEnvironment())
  381. {
  382. await testEnvironment.StartServerAsync();
  383. var c1 = await testEnvironment.ConnectClientAsync();
  384. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  385. await c1.DisconnectAsync();
  386. var receivedMessagesCount = 0;
  387. var c2 = await testEnvironment.ConnectClientAsync();
  388. c2.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  389. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained_other").Build());
  390. await Task.Delay(500);
  391. Assert.AreEqual(0, receivedMessagesCount);
  392. }
  393. }
  394. [TestMethod]
  395. public async Task Receive_Retained_Message_After_Subscribe()
  396. {
  397. using (var testEnvironment = new TestEnvironment())
  398. {
  399. await testEnvironment.StartServerAsync();
  400. var c1 = await testEnvironment.ConnectClientAsync();
  401. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  402. await c1.DisconnectAsync();
  403. var receivedMessages = new List<MqttApplicationMessage>();
  404. var c2 = await testEnvironment.ConnectClientAsync();
  405. c2.UseReceivedApplicationMessageHandler(c =>
  406. {
  407. lock (receivedMessages)
  408. {
  409. receivedMessages.Add(c.ApplicationMessage);
  410. }
  411. });
  412. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  413. await Task.Delay(500);
  414. Assert.AreEqual(1, receivedMessages.Count);
  415. Assert.IsTrue(receivedMessages.First().Retain);
  416. }
  417. }
  418. [TestMethod]
  419. public async Task Clear_Retained_Message()
  420. {
  421. using (var testEnvironment = new TestEnvironment())
  422. {
  423. var receivedMessagesCount = 0;
  424. await testEnvironment.StartServerAsync();
  425. var c1 = await testEnvironment.ConnectClientAsync();
  426. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  427. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build());
  428. await c1.DisconnectAsync();
  429. var c2 = await testEnvironment.ConnectClientAsync();
  430. c2.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  431. await Task.Delay(200);
  432. await c2.SubscribeAsync(new TopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  433. await Task.Delay(500);
  434. Assert.AreEqual(0, receivedMessagesCount);
  435. }
  436. }
  437. [TestMethod]
  438. public async Task Persist_Retained_Message()
  439. {
  440. var serverStorage = new TestServerStorage();
  441. using (var testEnvironment = new TestEnvironment())
  442. {
  443. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithStorage(serverStorage));
  444. var c1 = await testEnvironment.ConnectClientAsync();
  445. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  446. await Task.Delay(500);
  447. Assert.AreEqual(1, serverStorage.Messages.Count);
  448. }
  449. }
  450. [TestMethod]
  451. public async Task Intercept_Message()
  452. {
  453. void Interceptor(MqttApplicationMessageInterceptorContext context)
  454. {
  455. context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
  456. }
  457. using (var testEnvironment = new TestEnvironment())
  458. {
  459. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(Interceptor));
  460. var c1 = await testEnvironment.ConnectClientAsync();
  461. var c2 = await testEnvironment.ConnectClientAsync();
  462. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build());
  463. var isIntercepted = false;
  464. c2.UseReceivedApplicationMessageHandler(c =>
  465. {
  466. isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(c.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
  467. });
  468. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("test").Build());
  469. await c1.DisconnectAsync();
  470. await Task.Delay(500);
  471. Assert.IsTrue(isIntercepted);
  472. }
  473. }
  474. [TestMethod]
  475. public async Task Send_Long_Body()
  476. {
  477. using (var testEnvironment = new TestEnvironment())
  478. {
  479. const int PayloadSizeInMB = 30;
  480. const int CharCount = PayloadSizeInMB * 1024 * 1024;
  481. var longBody = new byte[CharCount];
  482. byte @char = 32;
  483. for (long i = 0; i < PayloadSizeInMB * 1024L * 1024L; i++)
  484. {
  485. longBody[i] = @char;
  486. @char++;
  487. if (@char > 126)
  488. {
  489. @char = 32;
  490. }
  491. }
  492. byte[] receivedBody = null;
  493. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  494. var client1 = await testEnvironment.ConnectClientAsync();
  495. client1.UseReceivedApplicationMessageHandler(c =>
  496. {
  497. receivedBody = c.ApplicationMessage.Payload;
  498. });
  499. await client1.SubscribeAsync("string");
  500. var client2 = await testEnvironment.ConnectClientAsync();
  501. await client2.PublishAsync("string", longBody);
  502. await Task.Delay(500);
  503. Assert.IsTrue(longBody.SequenceEqual(receivedBody ?? new byte[0]));
  504. }
  505. }
  506. [TestMethod]
  507. public async Task Deny_Connection()
  508. {
  509. var serverOptions = new MqttServerOptionsBuilder().WithConnectionValidator(context =>
  510. {
  511. context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized;
  512. });
  513. using (var testEnvironment = new TestEnvironment())
  514. {
  515. testEnvironment.IgnoreClientLogErrors = true;
  516. await testEnvironment.StartServerAsync(serverOptions);
  517. try
  518. {
  519. await testEnvironment.ConnectClientAsync();
  520. Assert.Fail("An exception should be raised.");
  521. }
  522. catch (Exception exception)
  523. {
  524. if (exception is MqttConnectingFailedException connectingFailedException)
  525. {
  526. Assert.AreEqual(MqttClientConnectResultCode.NotAuthorized, connectingFailedException.ResultCode);
  527. }
  528. else
  529. {
  530. Assert.Fail("Wrong exception.");
  531. }
  532. }
  533. }
  534. }
  535. [TestMethod]
  536. public async Task Same_Client_Id_Connect_Disconnect_Event_Order()
  537. {
  538. using (var testEnvironment = new TestEnvironment())
  539. {
  540. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  541. var events = new List<string>();
  542. server.ClientConnected += (_, __) =>
  543. {
  544. lock (events)
  545. {
  546. events.Add("c");
  547. }
  548. };
  549. server.ClientDisconnected += (_, __) =>
  550. {
  551. lock (events)
  552. {
  553. events.Add("d");
  554. }
  555. };
  556. var clientOptions = new MqttClientOptionsBuilder()
  557. .WithClientId("same_id");
  558. // c
  559. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  560. await Task.Delay(500);
  561. var flow = string.Join(string.Empty, events);
  562. Assert.AreEqual("c", flow);
  563. // dc
  564. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  565. await Task.Delay(500);
  566. flow = string.Join(string.Empty, events);
  567. Assert.AreEqual("cdc", flow);
  568. // nothing
  569. await c1.DisconnectAsync();
  570. await Task.Delay(500);
  571. // d
  572. await c2.DisconnectAsync();
  573. await Task.Delay(500);
  574. await server.StopAsync();
  575. flow = string.Join(string.Empty, events);
  576. Assert.AreEqual("cdcd", flow);
  577. }
  578. }
  579. [TestMethod]
  580. public async Task Remove_Session()
  581. {
  582. using (var testEnvironment = new TestEnvironment())
  583. {
  584. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  585. var clientOptions = new MqttClientOptionsBuilder();
  586. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  587. await Task.Delay(500);
  588. Assert.AreEqual(1, (await server.GetClientStatusAsync()).Count);
  589. await c1.DisconnectAsync();
  590. await Task.Delay(500);
  591. Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count);
  592. }
  593. }
  594. [TestMethod]
  595. public async Task Stop_And_Restart()
  596. {
  597. using (var testEnvironment = new TestEnvironment())
  598. {
  599. testEnvironment.IgnoreClientLogErrors = true;
  600. var server = await testEnvironment.StartServerAsync();
  601. await testEnvironment.ConnectClientAsync();
  602. await server.StopAsync();
  603. try
  604. {
  605. await testEnvironment.ConnectClientAsync();
  606. Assert.Fail("Connecting should fail.");
  607. }
  608. catch (Exception)
  609. {
  610. }
  611. await server.StartAsync(new MqttServerOptionsBuilder().WithDefaultEndpointPort(testEnvironment.ServerPort).Build());
  612. await testEnvironment.ConnectClientAsync();
  613. }
  614. }
  615. [TestMethod]
  616. public async Task Close_Idle_Connection()
  617. {
  618. using (var testEnvironment = new TestEnvironment())
  619. {
  620. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));
  621. var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  622. await client.ConnectAsync("localhost", testEnvironment.ServerPort);
  623. // Don't send anything. The server should close the connection.
  624. await Task.Delay(TimeSpan.FromSeconds(3));
  625. try
  626. {
  627. var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
  628. if (receivedBytes == 0)
  629. {
  630. return;
  631. }
  632. Assert.Fail("Receive should throw an exception.");
  633. }
  634. catch (SocketException)
  635. {
  636. }
  637. }
  638. }
  639. [TestMethod]
  640. public async Task Send_Garbage()
  641. {
  642. using (var testEnvironment = new TestEnvironment())
  643. {
  644. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));
  645. // Send an invalid packet and ensure that the server will close the connection and stay in a waiting state
  646. // forever. This is security related.
  647. var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  648. await client.ConnectAsync("localhost", testEnvironment.ServerPort);
  649. await client.SendAsync(Encoding.UTF8.GetBytes("Garbage"), SocketFlags.None);
  650. await Task.Delay(TimeSpan.FromSeconds(3));
  651. try
  652. {
  653. var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
  654. if (receivedBytes == 0)
  655. {
  656. return;
  657. }
  658. Assert.Fail("Receive should throw an exception.");
  659. }
  660. catch (SocketException)
  661. {
  662. }
  663. }
  664. }
  665. [TestMethod]
  666. public async Task Do_Not_Send_Retained_Messages_For_Denied_Subscription()
  667. {
  668. using (var testEnvironment = new TestEnvironment())
  669. {
  670. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(c =>
  671. {
  672. // This should lead to no subscriptions for "n" at all. So also no sending of retained messages.
  673. if (c.TopicFilter.Topic == "n")
  674. {
  675. c.AcceptSubscription = false;
  676. }
  677. }));
  678. // Prepare some retained messages.
  679. var client1 = await testEnvironment.ConnectClientAsync();
  680. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("y").WithPayload("x").WithRetainFlag().Build());
  681. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("n").WithPayload("x").WithRetainFlag().Build());
  682. await client1.DisconnectAsync();
  683. await Task.Delay(500);
  684. // Subscribe to all retained message types.
  685. // It is important to do this in a range of filters to ensure that a subscription is not "hidden".
  686. var client2 = await testEnvironment.ConnectClientAsync();
  687. var buffer = new StringBuilder();
  688. client2.UseReceivedApplicationMessageHandler(c =>
  689. {
  690. lock (buffer)
  691. {
  692. buffer.Append(c.ApplicationMessage.Topic);
  693. }
  694. });
  695. await client2.SubscribeAsync(new TopicFilter { Topic = "y" }, new TopicFilter { Topic = "n" });
  696. await Task.Delay(500);
  697. Assert.AreEqual("y", buffer.ToString());
  698. }
  699. }
  700. [TestMethod]
  701. public async Task Collect_Messages_In_Disconnected_Session()
  702. {
  703. using (var testEnvironment = new TestEnvironment())
  704. {
  705. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions());
  706. // Create the session including the subscription.
  707. var client1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a"));
  708. await client1.SubscribeAsync("x");
  709. await client1.DisconnectAsync();
  710. await Task.Delay(500);
  711. var clientStatus = await server.GetClientStatusAsync();
  712. Assert.AreEqual(0, clientStatus.Count);
  713. var client2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("b"));
  714. await client2.PublishAsync("x", "1");
  715. await client2.PublishAsync("x", "2");
  716. await client2.PublishAsync("x", "3");
  717. await client2.DisconnectAsync();
  718. await Task.Delay(500);
  719. clientStatus = await server.GetClientStatusAsync();
  720. var sessionStatus = await server.GetSessionStatusAsync();
  721. Assert.AreEqual(0, clientStatus.Count);
  722. Assert.AreEqual(2, sessionStatus.Count);
  723. Assert.AreEqual(3, sessionStatus.First(s => s.ClientId == "a").PendingApplicationMessagesCount);
  724. }
  725. }
  726. private static async Task TestPublishAsync(
  727. string topic,
  728. MqttQualityOfServiceLevel qualityOfServiceLevel,
  729. string topicFilter,
  730. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  731. int expectedReceivedMessagesCount)
  732. {
  733. using (var testEnvironment = new TestEnvironment())
  734. {
  735. var receivedMessagesCount = 0;
  736. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  737. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("receiver"));
  738. c1.UseReceivedApplicationMessageHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  739. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());
  740. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("sender"));
  741. await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build());
  742. await c2.DisconnectAsync().ConfigureAwait(false);
  743. await Task.Delay(500);
  744. await c1.UnsubscribeAsync(topicFilter);
  745. await Task.Delay(500);
  746. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  747. }
  748. }
  749. }
  750. }