You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1194 lines
44 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.Sockets;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using Microsoft.VisualStudio.TestTools.UnitTesting;
  9. using MQTTnet.Adapter;
  10. using MQTTnet.Client;
  11. using MQTTnet.Client.Connecting;
  12. using MQTTnet.Client.Disconnecting;
  13. using MQTTnet.Client.Options;
  14. using MQTTnet.Client.Receiving;
  15. using MQTTnet.Client.Subscribing;
  16. using MQTTnet.Protocol;
  17. using MQTTnet.Server;
  18. using MQTTnet.Tests.Mockups;
  19. namespace MQTTnet.Tests
  20. {
  21. [TestClass]
  22. public class Server_Tests
  23. {
  24. [TestMethod]
  25. public async Task Use_Empty_Client_ID()
  26. {
  27. using (var testEnvironment = new TestEnvironment())
  28. {
  29. await testEnvironment.StartServerAsync();
  30. var client = testEnvironment.CreateClient();
  31. var clientOptions = new MqttClientOptionsBuilder()
  32. .WithTcpServer("localhost", testEnvironment.ServerPort)
  33. .WithClientId(string.Empty)
  34. .Build();
  35. var connectResult = await client.ConnectAsync(clientOptions);
  36. Assert.IsFalse(connectResult.IsSessionPresent);
  37. Assert.IsTrue(client.IsConnected);
  38. }
  39. }
  40. [TestMethod]
  41. public async Task Publish_At_Most_Once_0x00()
  42. {
  43. await TestPublishAsync(
  44. "A/B/C",
  45. MqttQualityOfServiceLevel.AtMostOnce,
  46. "A/B/C",
  47. MqttQualityOfServiceLevel.AtMostOnce,
  48. 1);
  49. }
  50. [TestMethod]
  51. public async Task Publish_At_Least_Once_0x01()
  52. {
  53. await TestPublishAsync(
  54. "A/B/C",
  55. MqttQualityOfServiceLevel.AtLeastOnce,
  56. "A/B/C",
  57. MqttQualityOfServiceLevel.AtLeastOnce,
  58. 1);
  59. }
  60. [TestMethod]
  61. public async Task Publish_Exactly_Once_0x02()
  62. {
  63. await TestPublishAsync(
  64. "A/B/C",
  65. MqttQualityOfServiceLevel.ExactlyOnce,
  66. "A/B/C",
  67. MqttQualityOfServiceLevel.ExactlyOnce,
  68. 1);
  69. }
  70. [TestMethod]
  71. public async Task Use_Clean_Session()
  72. {
  73. using (var testEnvironment = new TestEnvironment())
  74. {
  75. await testEnvironment.StartServerAsync();
  76. var client = testEnvironment.CreateClient();
  77. var connectResult = await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost", testEnvironment.ServerPort).WithCleanSession().Build());
  78. Assert.IsFalse(connectResult.IsSessionPresent);
  79. }
  80. }
  81. [TestMethod]
  82. public async Task Will_Message_Do_Not_Send()
  83. {
  84. using (var testEnvironment = new TestEnvironment())
  85. {
  86. var receivedMessagesCount = 0;
  87. await testEnvironment.StartServerAsync();
  88. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  89. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  90. var c1 = await testEnvironment.ConnectClientAsync();
  91. c1.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(c => Interlocked.Increment(ref receivedMessagesCount));
  92. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  93. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  94. await c2.DisconnectAsync().ConfigureAwait(false);
  95. await Task.Delay(1000);
  96. Assert.AreEqual(0, receivedMessagesCount);
  97. }
  98. }
  99. [TestMethod]
  100. public async Task Will_Message_Send()
  101. {
  102. using (var testEnvironment = new TestEnvironment())
  103. {
  104. var receivedMessagesCount = 0;
  105. await testEnvironment.StartServerAsync();
  106. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  107. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  108. var c1 = await testEnvironment.ConnectClientAsync();
  109. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  110. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  111. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  112. c2.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent.
  113. await Task.Delay(1000);
  114. Assert.AreEqual(1, receivedMessagesCount);
  115. }
  116. }
  117. [TestMethod]
  118. public async Task Intercept_Subscription()
  119. {
  120. using (var testEnvironment = new TestEnvironment())
  121. {
  122. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(
  123. c =>
  124. {
  125. // Set the topic to "a" regards what the client wants to subscribe.
  126. c.TopicFilter.Topic = "a";
  127. }));
  128. var topicAReceived = false;
  129. var topicBReceived = false;
  130. var client = await testEnvironment.ConnectClientAsync();
  131. client.UseApplicationMessageReceivedHandler(c =>
  132. {
  133. if (c.ApplicationMessage.Topic == "a")
  134. {
  135. topicAReceived = true;
  136. }
  137. else if (c.ApplicationMessage.Topic == "b")
  138. {
  139. topicBReceived = true;
  140. }
  141. });
  142. await client.SubscribeAsync("b");
  143. await client.PublishAsync("a");
  144. await Task.Delay(500);
  145. Assert.IsTrue(topicAReceived);
  146. Assert.IsFalse(topicBReceived);
  147. }
  148. }
  149. [TestMethod]
  150. public async Task Subscribe_Unsubscribe()
  151. {
  152. using (var testEnvironment = new TestEnvironment())
  153. {
  154. var receivedMessagesCount = 0;
  155. var server = await testEnvironment.StartServerAsync();
  156. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c1"));
  157. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  158. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c2"));
  159. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  160. await c2.PublishAsync(message);
  161. await Task.Delay(500);
  162. Assert.AreEqual(0, receivedMessagesCount);
  163. var subscribeEventCalled = false;
  164. server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(e =>
  165. {
  166. subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1";
  167. });
  168. await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  169. await Task.Delay(250);
  170. Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");
  171. await c2.PublishAsync(message);
  172. await Task.Delay(250);
  173. Assert.AreEqual(1, receivedMessagesCount);
  174. var unsubscribeEventCalled = false;
  175. server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e =>
  176. {
  177. unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1";
  178. });
  179. await c1.UnsubscribeAsync("a");
  180. await Task.Delay(250);
  181. Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");
  182. await c2.PublishAsync(message);
  183. await Task.Delay(500);
  184. Assert.AreEqual(1, receivedMessagesCount);
  185. await Task.Delay(500);
  186. Assert.AreEqual(1, receivedMessagesCount);
  187. }
  188. }
  189. [TestMethod]
  190. public async Task Subscribe_Multiple_In_Single_Request()
  191. {
  192. using (var testEnvironment = new TestEnvironment())
  193. {
  194. var receivedMessagesCount = 0;
  195. await testEnvironment.StartServerAsync();
  196. var c1 = await testEnvironment.ConnectClientAsync();
  197. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  198. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  199. .WithTopicFilter("a")
  200. .WithTopicFilter("b")
  201. .WithTopicFilter("c")
  202. .Build());
  203. var c2 = await testEnvironment.ConnectClientAsync();
  204. await c2.PublishAsync("a");
  205. await Task.Delay(100);
  206. Assert.AreEqual(receivedMessagesCount, 1);
  207. await c2.PublishAsync("b");
  208. await Task.Delay(100);
  209. Assert.AreEqual(receivedMessagesCount, 2);
  210. await c2.PublishAsync("c");
  211. await Task.Delay(100);
  212. Assert.AreEqual(receivedMessagesCount, 3);
  213. }
  214. }
  215. [TestMethod]
  216. public async Task Subscribe_Multiple_In_Multiple_Request()
  217. {
  218. using (var testEnvironment = new TestEnvironment())
  219. {
  220. var receivedMessagesCount = 0;
  221. await testEnvironment.StartServerAsync();
  222. var c1 = await testEnvironment.ConnectClientAsync();
  223. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  224. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  225. .WithTopicFilter("a")
  226. .Build());
  227. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  228. .WithTopicFilter("b")
  229. .Build());
  230. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  231. .WithTopicFilter("c")
  232. .Build());
  233. var c2 = await testEnvironment.ConnectClientAsync();
  234. await c2.PublishAsync("a");
  235. await Task.Delay(100);
  236. Assert.AreEqual(receivedMessagesCount, 1);
  237. await c2.PublishAsync("b");
  238. await Task.Delay(100);
  239. Assert.AreEqual(receivedMessagesCount, 2);
  240. await c2.PublishAsync("c");
  241. await Task.Delay(100);
  242. Assert.AreEqual(receivedMessagesCount, 3);
  243. }
  244. }
  245. [TestMethod]
  246. public async Task Publish_From_Server()
  247. {
  248. using (var testEnvironment = new TestEnvironment())
  249. {
  250. var server = await testEnvironment.StartServerAsync();
  251. var receivedMessagesCount = 0;
  252. var client = await testEnvironment.ConnectClientAsync();
  253. client.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  254. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  255. await client.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  256. await server.PublishAsync(message);
  257. await Task.Delay(1000);
  258. Assert.AreEqual(1, receivedMessagesCount);
  259. }
  260. }
  261. [TestMethod]
  262. public async Task Publish_Multiple_Clients()
  263. {
  264. var receivedMessagesCount = 0;
  265. var locked = new object();
  266. using (var testEnvironment = new TestEnvironment())
  267. {
  268. await testEnvironment.StartServerAsync();
  269. var c1 = await testEnvironment.ConnectClientAsync();
  270. var c2 = await testEnvironment.ConnectClientAsync();
  271. c1.UseApplicationMessageReceivedHandler(c =>
  272. {
  273. lock (locked)
  274. {
  275. receivedMessagesCount++;
  276. }
  277. });
  278. c2.UseApplicationMessageReceivedHandler(c =>
  279. {
  280. lock (locked)
  281. {
  282. receivedMessagesCount++;
  283. }
  284. });
  285. await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  286. await c2.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  287. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  288. for (var i = 0; i < 1000; i++)
  289. {
  290. await c1.PublishAsync(message);
  291. }
  292. await Task.Delay(500);
  293. Assert.AreEqual(2000, receivedMessagesCount);
  294. }
  295. }
  296. [TestMethod]
  297. public async Task Session_Takeover()
  298. {
  299. using (var testEnvironment = new TestEnvironment())
  300. {
  301. await testEnvironment.StartServerAsync();
  302. var options = new MqttClientOptionsBuilder()
  303. .WithCleanSession(false)
  304. .WithClientId("a");
  305. var client1 = await testEnvironment.ConnectClientAsync(options);
  306. await Task.Delay(500);
  307. var client2 = await testEnvironment.ConnectClientAsync(options);
  308. await Task.Delay(500);
  309. Assert.IsFalse(client1.IsConnected);
  310. Assert.IsTrue(client2.IsConnected);
  311. }
  312. }
  313. [TestMethod]
  314. public async Task No_Messages_If_No_Subscription()
  315. {
  316. using (var testEnvironment = new TestEnvironment())
  317. {
  318. await testEnvironment.StartServerAsync();
  319. var client = await testEnvironment.ConnectClientAsync();
  320. var receivedMessages = new List<MqttApplicationMessage>();
  321. client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e =>
  322. {
  323. await client.PublishAsync("Connected");
  324. });
  325. client.UseApplicationMessageReceivedHandler(c =>
  326. {
  327. lock (receivedMessages)
  328. {
  329. receivedMessages.Add(c.ApplicationMessage);
  330. }
  331. });
  332. await Task.Delay(500);
  333. await client.PublishAsync("Hello");
  334. await Task.Delay(500);
  335. Assert.AreEqual(0, receivedMessages.Count);
  336. }
  337. }
  338. [TestMethod]
  339. public async Task Set_Subscription_At_Server()
  340. {
  341. using (var testEnvironment = new TestEnvironment())
  342. {
  343. var server = await testEnvironment.StartServerAsync();
  344. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1"));
  345. var client = await testEnvironment.ConnectClientAsync();
  346. var receivedMessages = new List<MqttApplicationMessage>();
  347. client.UseApplicationMessageReceivedHandler(c =>
  348. {
  349. lock (receivedMessages)
  350. {
  351. receivedMessages.Add(c.ApplicationMessage);
  352. }
  353. });
  354. await Task.Delay(500);
  355. await client.PublishAsync("Hello");
  356. await Task.Delay(100);
  357. Assert.AreEqual(0, receivedMessages.Count);
  358. await client.PublishAsync("topic1");
  359. await Task.Delay(100);
  360. Assert.AreEqual(1, receivedMessages.Count);
  361. }
  362. }
  363. [TestMethod]
  364. public async Task Shutdown_Disconnects_Clients_Gracefully()
  365. {
  366. using (var testEnvironment = new TestEnvironment())
  367. {
  368. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  369. var disconnectCalled = 0;
  370. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  371. c1.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e => disconnectCalled++);
  372. await Task.Delay(100);
  373. await server.StopAsync();
  374. await Task.Delay(100);
  375. Assert.AreEqual(1, disconnectCalled);
  376. }
  377. }
  378. [TestMethod]
  379. public async Task Handle_Clean_Disconnect()
  380. {
  381. using (var testEnvironment = new TestEnvironment())
  382. {
  383. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  384. var clientConnectedCalled = 0;
  385. var clientDisconnectedCalled = 0;
  386. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => Interlocked.Increment(ref clientConnectedCalled));
  387. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ => Interlocked.Increment(ref clientDisconnectedCalled));
  388. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  389. Assert.AreEqual(1, clientConnectedCalled);
  390. Assert.AreEqual(0, clientDisconnectedCalled);
  391. await Task.Delay(500);
  392. await c1.DisconnectAsync();
  393. await Task.Delay(500);
  394. Assert.AreEqual(1, clientConnectedCalled);
  395. Assert.AreEqual(1, clientDisconnectedCalled);
  396. }
  397. }
  398. [TestMethod]
  399. public async Task Client_Disconnect_Without_Errors()
  400. {
  401. using (var testEnvironment = new TestEnvironment())
  402. {
  403. bool clientWasConnected;
  404. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  405. try
  406. {
  407. var client = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  408. clientWasConnected = true;
  409. await client.DisconnectAsync();
  410. await Task.Delay(500);
  411. }
  412. finally
  413. {
  414. await server.StopAsync();
  415. }
  416. Assert.IsTrue(clientWasConnected);
  417. testEnvironment.ThrowIfLogErrors();
  418. }
  419. }
  420. [TestMethod]
  421. public async Task Handle_Lots_Of_Parallel_Retained_Messages()
  422. {
  423. const int ClientCount = 50;
  424. using (var testEnvironment = new TestEnvironment())
  425. {
  426. var server = await testEnvironment.StartServerAsync();
  427. var tasks = new List<Task>();
  428. for (var i = 0; i < ClientCount; i++)
  429. {
  430. var i2 = i;
  431. var testEnvironment2 = testEnvironment;
  432. tasks.Add(Task.Run(async () =>
  433. {
  434. try
  435. {
  436. using (var client = await testEnvironment2.ConnectClientAsync())
  437. {
  438. // Clear retained message.
  439. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  440. .WithPayload(new byte[0]).WithRetainFlag().Build());
  441. // Set retained message.
  442. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  443. .WithPayload("value").WithRetainFlag().Build());
  444. await client.DisconnectAsync();
  445. }
  446. }
  447. catch (Exception exception)
  448. {
  449. testEnvironment2.TrackException(exception);
  450. }
  451. }));
  452. }
  453. await Task.WhenAll(tasks);
  454. await Task.Delay(1000);
  455. var retainedMessages = await server.GetRetainedApplicationMessagesAsync();
  456. Assert.AreEqual(ClientCount, retainedMessages.Count);
  457. for (var i = 0; i < ClientCount; i++)
  458. {
  459. Assert.IsTrue(retainedMessages.Any(m => m.Topic == "r" + i));
  460. }
  461. }
  462. }
  463. [TestMethod]
  464. public async Task Retained_Messages_Flow()
  465. {
  466. using (var testEnvironment = new TestEnvironment())
  467. {
  468. var retainedMessage = new MqttApplicationMessageBuilder().WithTopic("r").WithPayload("r").WithRetainFlag().Build();
  469. await testEnvironment.StartServerAsync();
  470. var c1 = await testEnvironment.ConnectClientAsync();
  471. var receivedMessages = 0;
  472. var c2 = await testEnvironment.ConnectClientAsync();
  473. c2.UseApplicationMessageReceivedHandler(c =>
  474. {
  475. Interlocked.Increment(ref receivedMessages);
  476. });
  477. await c1.PublishAsync(retainedMessage);
  478. await c1.DisconnectAsync();
  479. await Task.Delay(500);
  480. for (var i = 0; i < 5; i++)
  481. {
  482. await c2.UnsubscribeAsync("r");
  483. await Task.Delay(100);
  484. Assert.AreEqual(i, receivedMessages);
  485. await c2.SubscribeAsync("r");
  486. await Task.Delay(100);
  487. Assert.AreEqual(i + 1, receivedMessages);
  488. }
  489. await c2.DisconnectAsync();
  490. }
  491. }
  492. [TestMethod]
  493. public async Task Receive_No_Retained_Message_After_Subscribe()
  494. {
  495. using (var testEnvironment = new TestEnvironment())
  496. {
  497. await testEnvironment.StartServerAsync();
  498. var c1 = await testEnvironment.ConnectClientAsync();
  499. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  500. await c1.DisconnectAsync();
  501. var receivedMessagesCount = 0;
  502. var c2 = await testEnvironment.ConnectClientAsync();
  503. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  504. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained_other").Build());
  505. await Task.Delay(500);
  506. Assert.AreEqual(0, receivedMessagesCount);
  507. }
  508. }
  509. [TestMethod]
  510. public async Task Receive_Retained_Message_After_Subscribe()
  511. {
  512. using (var testEnvironment = new TestEnvironment())
  513. {
  514. await testEnvironment.StartServerAsync();
  515. var c1 = await testEnvironment.ConnectClientAsync();
  516. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  517. await c1.DisconnectAsync();
  518. var receivedMessages = new List<MqttApplicationMessage>();
  519. var c2 = await testEnvironment.ConnectClientAsync();
  520. c2.UseApplicationMessageReceivedHandler(c =>
  521. {
  522. lock (receivedMessages)
  523. {
  524. receivedMessages.Add(c.ApplicationMessage);
  525. }
  526. });
  527. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  528. await Task.Delay(500);
  529. Assert.AreEqual(1, receivedMessages.Count);
  530. Assert.IsTrue(receivedMessages.First().Retain);
  531. }
  532. }
  533. [TestMethod]
  534. public async Task Clear_Retained_Message_With_Empty_Payload()
  535. {
  536. using (var testEnvironment = new TestEnvironment())
  537. {
  538. var receivedMessagesCount = 0;
  539. await testEnvironment.StartServerAsync();
  540. var c1 = await testEnvironment.ConnectClientAsync();
  541. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  542. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build());
  543. await c1.DisconnectAsync();
  544. var c2 = await testEnvironment.ConnectClientAsync();
  545. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  546. await Task.Delay(200);
  547. await c2.SubscribeAsync(new TopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  548. await Task.Delay(500);
  549. Assert.AreEqual(0, receivedMessagesCount);
  550. }
  551. }
  552. [TestMethod]
  553. public async Task Clear_Retained_Message_With_Null_Payload()
  554. {
  555. using (var testEnvironment = new TestEnvironment())
  556. {
  557. var receivedMessagesCount = 0;
  558. await testEnvironment.StartServerAsync();
  559. var c1 = await testEnvironment.ConnectClientAsync();
  560. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  561. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload((byte[])null).WithRetainFlag().Build());
  562. await c1.DisconnectAsync();
  563. var c2 = await testEnvironment.ConnectClientAsync();
  564. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  565. await Task.Delay(200);
  566. await c2.SubscribeAsync(new TopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  567. await Task.Delay(500);
  568. Assert.AreEqual(0, receivedMessagesCount);
  569. }
  570. }
  571. [TestMethod]
  572. public async Task Intercept_Application_Message()
  573. {
  574. using (var testEnvironment = new TestEnvironment())
  575. {
  576. await testEnvironment.StartServerAsync(
  577. new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(
  578. c => { c.ApplicationMessage = new MqttApplicationMessage { Topic = "new_topic" }; }));
  579. string receivedTopic = null;
  580. var c1 = await testEnvironment.ConnectClientAsync();
  581. await c1.SubscribeAsync("#");
  582. c1.UseApplicationMessageReceivedHandler(a => { receivedTopic = a.ApplicationMessage.Topic; });
  583. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("original_topic").Build());
  584. await Task.Delay(500);
  585. Assert.AreEqual("new_topic", receivedTopic);
  586. }
  587. }
  588. [TestMethod]
  589. public async Task Persist_Retained_Message()
  590. {
  591. var serverStorage = new TestServerStorage();
  592. using (var testEnvironment = new TestEnvironment())
  593. {
  594. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithStorage(serverStorage));
  595. var c1 = await testEnvironment.ConnectClientAsync();
  596. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  597. await Task.Delay(500);
  598. Assert.AreEqual(1, serverStorage.Messages.Count);
  599. }
  600. }
  601. [TestMethod]
  602. public async Task Publish_After_Client_Connects()
  603. {
  604. using (var testEnvironment = new TestEnvironment())
  605. {
  606. var server = await testEnvironment.StartServerAsync();
  607. server.UseClientConnectedHandler(async e =>
  608. {
  609. await server.PublishAsync("/test/1", "true", MqttQualityOfServiceLevel.ExactlyOnce, false);
  610. });
  611. string receivedTopic = null;
  612. var c1 = await testEnvironment.ConnectClientAsync();
  613. c1.UseApplicationMessageReceivedHandler(e => { receivedTopic = e.ApplicationMessage.Topic; });
  614. await c1.SubscribeAsync("#");
  615. await testEnvironment.ConnectClientAsync();
  616. await testEnvironment.ConnectClientAsync();
  617. await testEnvironment.ConnectClientAsync();
  618. await testEnvironment.ConnectClientAsync();
  619. await Task.Delay(500);
  620. Assert.AreEqual("/test/1", receivedTopic);
  621. }
  622. }
  623. [TestMethod]
  624. public async Task Intercept_Message()
  625. {
  626. void Interceptor(MqttApplicationMessageInterceptorContext context)
  627. {
  628. context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
  629. }
  630. using (var testEnvironment = new TestEnvironment())
  631. {
  632. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(Interceptor));
  633. var c1 = await testEnvironment.ConnectClientAsync();
  634. var c2 = await testEnvironment.ConnectClientAsync();
  635. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build());
  636. var isIntercepted = false;
  637. c2.UseApplicationMessageReceivedHandler(c =>
  638. {
  639. isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(c.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
  640. });
  641. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("test").Build());
  642. await c1.DisconnectAsync();
  643. await Task.Delay(500);
  644. Assert.IsTrue(isIntercepted);
  645. }
  646. }
  647. [TestMethod]
  648. public async Task Send_Long_Body()
  649. {
  650. using (var testEnvironment = new TestEnvironment())
  651. {
  652. const int PayloadSizeInMB = 30;
  653. const int CharCount = PayloadSizeInMB * 1024 * 1024;
  654. var longBody = new byte[CharCount];
  655. byte @char = 32;
  656. for (long i = 0; i < PayloadSizeInMB * 1024L * 1024L; i++)
  657. {
  658. longBody[i] = @char;
  659. @char++;
  660. if (@char > 126)
  661. {
  662. @char = 32;
  663. }
  664. }
  665. byte[] receivedBody = null;
  666. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  667. var client1 = await testEnvironment.ConnectClientAsync();
  668. client1.UseApplicationMessageReceivedHandler(c =>
  669. {
  670. receivedBody = c.ApplicationMessage.Payload;
  671. });
  672. await client1.SubscribeAsync("string");
  673. var client2 = await testEnvironment.ConnectClientAsync();
  674. await client2.PublishAsync("string", longBody);
  675. await Task.Delay(500);
  676. Assert.IsTrue(longBody.SequenceEqual(receivedBody ?? new byte[0]));
  677. }
  678. }
  679. [TestMethod]
  680. public async Task Deny_Connection()
  681. {
  682. var serverOptions = new MqttServerOptionsBuilder().WithConnectionValidator(context =>
  683. {
  684. context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized;
  685. });
  686. using (var testEnvironment = new TestEnvironment())
  687. {
  688. testEnvironment.IgnoreClientLogErrors = true;
  689. await testEnvironment.StartServerAsync(serverOptions);
  690. try
  691. {
  692. await testEnvironment.ConnectClientAsync();
  693. Assert.Fail("An exception should be raised.");
  694. }
  695. catch (Exception exception)
  696. {
  697. if (exception is MqttConnectingFailedException connectingFailedException)
  698. {
  699. Assert.AreEqual(MqttClientConnectResultCode.NotAuthorized, connectingFailedException.ResultCode);
  700. }
  701. else
  702. {
  703. Assert.Fail("Wrong exception.");
  704. }
  705. }
  706. }
  707. }
  708. [TestMethod]
  709. public async Task Same_Client_Id_Connect_Disconnect_Event_Order()
  710. {
  711. using (var testEnvironment = new TestEnvironment())
  712. {
  713. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  714. var events = new List<string>();
  715. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ =>
  716. {
  717. lock (events)
  718. {
  719. events.Add("c");
  720. }
  721. });
  722. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ =>
  723. {
  724. lock (events)
  725. {
  726. events.Add("d");
  727. }
  728. });
  729. var clientOptions = new MqttClientOptionsBuilder()
  730. .WithClientId("same_id");
  731. // c
  732. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  733. await Task.Delay(500);
  734. var flow = string.Join(string.Empty, events);
  735. Assert.AreEqual("c", flow);
  736. // dc
  737. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  738. await Task.Delay(500);
  739. flow = string.Join(string.Empty, events);
  740. Assert.AreEqual("cdc", flow);
  741. // nothing
  742. await c1.DisconnectAsync();
  743. await Task.Delay(500);
  744. // d
  745. await c2.DisconnectAsync();
  746. await Task.Delay(500);
  747. await server.StopAsync();
  748. flow = string.Join(string.Empty, events);
  749. Assert.AreEqual("cdcd", flow);
  750. }
  751. }
  752. [TestMethod]
  753. public async Task Remove_Session()
  754. {
  755. using (var testEnvironment = new TestEnvironment())
  756. {
  757. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  758. var clientOptions = new MqttClientOptionsBuilder();
  759. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  760. await Task.Delay(500);
  761. Assert.AreEqual(1, (await server.GetClientStatusAsync()).Count);
  762. await c1.DisconnectAsync();
  763. await Task.Delay(500);
  764. Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count);
  765. }
  766. }
  767. [TestMethod]
  768. public async Task Stop_And_Restart()
  769. {
  770. using (var testEnvironment = new TestEnvironment())
  771. {
  772. testEnvironment.IgnoreClientLogErrors = true;
  773. var server = await testEnvironment.StartServerAsync();
  774. await testEnvironment.ConnectClientAsync();
  775. await server.StopAsync();
  776. try
  777. {
  778. await testEnvironment.ConnectClientAsync();
  779. Assert.Fail("Connecting should fail.");
  780. }
  781. catch (Exception)
  782. {
  783. }
  784. await server.StartAsync(new MqttServerOptionsBuilder().WithDefaultEndpointPort(testEnvironment.ServerPort).Build());
  785. await testEnvironment.ConnectClientAsync();
  786. }
  787. }
  788. [TestMethod]
  789. public async Task Close_Idle_Connection()
  790. {
  791. using (var testEnvironment = new TestEnvironment())
  792. {
  793. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));
  794. var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  795. await client.ConnectAsync("localhost", testEnvironment.ServerPort);
  796. // Don't send anything. The server should close the connection.
  797. await Task.Delay(TimeSpan.FromSeconds(3));
  798. try
  799. {
  800. var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
  801. if (receivedBytes == 0)
  802. {
  803. return;
  804. }
  805. Assert.Fail("Receive should throw an exception.");
  806. }
  807. catch (SocketException)
  808. {
  809. }
  810. }
  811. }
  812. [TestMethod]
  813. public async Task Send_Garbage()
  814. {
  815. using (var testEnvironment = new TestEnvironment())
  816. {
  817. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));
  818. // Send an invalid packet and ensure that the server will close the connection and stay in a waiting state
  819. // forever. This is security related.
  820. var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  821. await client.ConnectAsync("localhost", testEnvironment.ServerPort);
  822. await client.SendAsync(Encoding.UTF8.GetBytes("Garbage"), SocketFlags.None);
  823. await Task.Delay(TimeSpan.FromSeconds(3));
  824. try
  825. {
  826. var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
  827. if (receivedBytes == 0)
  828. {
  829. return;
  830. }
  831. Assert.Fail("Receive should throw an exception.");
  832. }
  833. catch (SocketException)
  834. {
  835. }
  836. }
  837. }
  838. [TestMethod]
  839. public async Task Do_Not_Send_Retained_Messages_For_Denied_Subscription()
  840. {
  841. using (var testEnvironment = new TestEnvironment())
  842. {
  843. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(c =>
  844. {
  845. // This should lead to no subscriptions for "n" at all. So also no sending of retained messages.
  846. if (c.TopicFilter.Topic == "n")
  847. {
  848. c.AcceptSubscription = false;
  849. }
  850. }));
  851. // Prepare some retained messages.
  852. var client1 = await testEnvironment.ConnectClientAsync();
  853. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("y").WithPayload("x").WithRetainFlag().Build());
  854. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("n").WithPayload("x").WithRetainFlag().Build());
  855. await client1.DisconnectAsync();
  856. await Task.Delay(500);
  857. // Subscribe to all retained message types.
  858. // It is important to do this in a range of filters to ensure that a subscription is not "hidden".
  859. var client2 = await testEnvironment.ConnectClientAsync();
  860. var buffer = new StringBuilder();
  861. client2.UseApplicationMessageReceivedHandler(c =>
  862. {
  863. lock (buffer)
  864. {
  865. buffer.Append(c.ApplicationMessage.Topic);
  866. }
  867. });
  868. await client2.SubscribeAsync(new TopicFilter { Topic = "y" }, new TopicFilter { Topic = "n" });
  869. await Task.Delay(500);
  870. Assert.AreEqual("y", buffer.ToString());
  871. }
  872. }
  873. [TestMethod]
  874. public async Task Collect_Messages_In_Disconnected_Session()
  875. {
  876. using (var testEnvironment = new TestEnvironment())
  877. {
  878. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions());
  879. // Create the session including the subscription.
  880. var client1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a"));
  881. await client1.SubscribeAsync("x");
  882. await client1.DisconnectAsync();
  883. await Task.Delay(500);
  884. var clientStatus = await server.GetClientStatusAsync();
  885. Assert.AreEqual(0, clientStatus.Count);
  886. var client2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("b"));
  887. await client2.PublishAsync("x", "1");
  888. await client2.PublishAsync("x", "2");
  889. await client2.PublishAsync("x", "3");
  890. await client2.DisconnectAsync();
  891. await Task.Delay(500);
  892. clientStatus = await server.GetClientStatusAsync();
  893. var sessionStatus = await server.GetSessionStatusAsync();
  894. Assert.AreEqual(0, clientStatus.Count);
  895. Assert.AreEqual(2, sessionStatus.Count);
  896. Assert.AreEqual(3, sessionStatus.First(s => s.ClientId == "a").PendingApplicationMessagesCount);
  897. }
  898. }
  899. private static async Task TestPublishAsync(
  900. string topic,
  901. MqttQualityOfServiceLevel qualityOfServiceLevel,
  902. string topicFilter,
  903. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  904. int expectedReceivedMessagesCount)
  905. {
  906. using (var testEnvironment = new TestEnvironment())
  907. {
  908. var receivedMessagesCount = 0;
  909. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  910. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("receiver"));
  911. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  912. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());
  913. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("sender"));
  914. await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build());
  915. await c2.DisconnectAsync().ConfigureAwait(false);
  916. await Task.Delay(500);
  917. await c1.UnsubscribeAsync(topicFilter);
  918. await Task.Delay(500);
  919. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  920. }
  921. }
  922. }
  923. }