You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1316 lines
48 KiB

  1. using Microsoft.VisualStudio.TestTools.UnitTesting;
  2. using MQTTnet.Adapter;
  3. using MQTTnet.Client;
  4. using MQTTnet.Client.Connecting;
  5. using MQTTnet.Client.Disconnecting;
  6. using MQTTnet.Client.Options;
  7. using MQTTnet.Client.Receiving;
  8. using MQTTnet.Client.Subscribing;
  9. using MQTTnet.Implementations;
  10. using MQTTnet.Protocol;
  11. using MQTTnet.Server;
  12. using MQTTnet.Tests.Mockups;
  13. using System;
  14. using System.Collections.Generic;
  15. using System.Linq;
  16. using System.Net.Sockets;
  17. using System.Text;
  18. using System.Threading;
  19. using System.Threading.Tasks;
  20. namespace MQTTnet.Tests
  21. {
  22. [TestClass]
  23. public class Server_Tests
  24. {
  25. public TestContext TestContext { get; set; }
  26. [TestMethod]
  27. public async Task Use_Empty_Client_ID()
  28. {
  29. using (var testEnvironment = new TestEnvironment(TestContext))
  30. {
  31. await testEnvironment.StartServerAsync();
  32. var client = testEnvironment.CreateClient();
  33. var clientOptions = new MqttClientOptionsBuilder()
  34. .WithTcpServer("localhost", testEnvironment.ServerPort)
  35. .WithClientId(string.Empty)
  36. .Build();
  37. var connectResult = await client.ConnectAsync(clientOptions);
  38. Assert.IsFalse(connectResult.IsSessionPresent);
  39. Assert.IsTrue(client.IsConnected);
  40. }
  41. }
  42. [TestMethod]
  43. public async Task Publish_At_Most_Once_0x00()
  44. {
  45. await TestPublishAsync(
  46. "A/B/C",
  47. MqttQualityOfServiceLevel.AtMostOnce,
  48. "A/B/C",
  49. MqttQualityOfServiceLevel.AtMostOnce,
  50. 1,
  51. TestContext);
  52. }
  53. [TestMethod]
  54. public async Task Publish_At_Least_Once_0x01()
  55. {
  56. await TestPublishAsync(
  57. "A/B/C",
  58. MqttQualityOfServiceLevel.AtLeastOnce,
  59. "A/B/C",
  60. MqttQualityOfServiceLevel.AtLeastOnce,
  61. 1,
  62. TestContext);
  63. }
  64. [TestMethod]
  65. public async Task Publish_Exactly_Once_0x02()
  66. {
  67. await TestPublishAsync(
  68. "A/B/C",
  69. MqttQualityOfServiceLevel.ExactlyOnce,
  70. "A/B/C",
  71. MqttQualityOfServiceLevel.ExactlyOnce,
  72. 1,
  73. TestContext);
  74. }
  75. [TestMethod]
  76. public async Task Use_Clean_Session()
  77. {
  78. using (var testEnvironment = new TestEnvironment(TestContext))
  79. {
  80. await testEnvironment.StartServerAsync();
  81. var client = testEnvironment.CreateClient();
  82. var connectResult = await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost", testEnvironment.ServerPort).WithCleanSession().Build());
  83. Assert.IsFalse(connectResult.IsSessionPresent);
  84. }
  85. }
  86. [TestMethod]
  87. public async Task Will_Message_Do_Not_Send()
  88. {
  89. using (var testEnvironment = new TestEnvironment(TestContext))
  90. {
  91. var receivedMessagesCount = 0;
  92. await testEnvironment.StartServerAsync();
  93. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  94. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  95. var c1 = await testEnvironment.ConnectClientAsync();
  96. c1.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(c => Interlocked.Increment(ref receivedMessagesCount));
  97. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  98. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  99. await c2.DisconnectAsync().ConfigureAwait(false);
  100. await Task.Delay(1000);
  101. Assert.AreEqual(0, receivedMessagesCount);
  102. }
  103. }
  104. [TestMethod]
  105. public async Task Will_Message_Send()
  106. {
  107. using (var testEnvironment = new TestEnvironment(TestContext))
  108. {
  109. var receivedMessagesCount = 0;
  110. await testEnvironment.StartServerAsync();
  111. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  112. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  113. var c1 = await testEnvironment.ConnectClientAsync();
  114. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  115. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  116. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  117. c2.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent.
  118. await Task.Delay(1000);
  119. Assert.AreEqual(1, receivedMessagesCount);
  120. }
  121. }
  122. [TestMethod]
  123. public async Task Intercept_Subscription()
  124. {
  125. using (var testEnvironment = new TestEnvironment(TestContext))
  126. {
  127. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(
  128. c =>
  129. {
  130. // Set the topic to "a" regards what the client wants to subscribe.
  131. c.TopicFilter.Topic = "a";
  132. }));
  133. var topicAReceived = false;
  134. var topicBReceived = false;
  135. var client = await testEnvironment.ConnectClientAsync();
  136. client.UseApplicationMessageReceivedHandler(c =>
  137. {
  138. if (c.ApplicationMessage.Topic == "a")
  139. {
  140. topicAReceived = true;
  141. }
  142. else if (c.ApplicationMessage.Topic == "b")
  143. {
  144. topicBReceived = true;
  145. }
  146. });
  147. await client.SubscribeAsync("b");
  148. await client.PublishAsync("a");
  149. await Task.Delay(500);
  150. Assert.IsTrue(topicAReceived);
  151. Assert.IsFalse(topicBReceived);
  152. }
  153. }
  154. [TestMethod]
  155. public async Task Subscribe_Unsubscribe()
  156. {
  157. using (var testEnvironment = new TestEnvironment(TestContext))
  158. {
  159. var receivedMessagesCount = 0;
  160. var server = await testEnvironment.StartServerAsync();
  161. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c1"));
  162. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  163. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c2"));
  164. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  165. await c2.PublishAsync(message);
  166. await Task.Delay(500);
  167. Assert.AreEqual(0, receivedMessagesCount);
  168. var subscribeEventCalled = false;
  169. server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(e =>
  170. {
  171. subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == c1.Options.ClientId;
  172. });
  173. await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  174. await Task.Delay(250);
  175. Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");
  176. await c2.PublishAsync(message);
  177. await Task.Delay(250);
  178. Assert.AreEqual(1, receivedMessagesCount);
  179. var unsubscribeEventCalled = false;
  180. server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e =>
  181. {
  182. unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == c1.Options.ClientId;
  183. });
  184. await c1.UnsubscribeAsync("a");
  185. await Task.Delay(250);
  186. Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");
  187. await c2.PublishAsync(message);
  188. await Task.Delay(500);
  189. Assert.AreEqual(1, receivedMessagesCount);
  190. await Task.Delay(500);
  191. Assert.AreEqual(1, receivedMessagesCount);
  192. }
  193. }
  194. [TestMethod]
  195. public async Task Subscribe_Multiple_In_Single_Request()
  196. {
  197. using (var testEnvironment = new TestEnvironment(TestContext))
  198. {
  199. var receivedMessagesCount = 0;
  200. await testEnvironment.StartServerAsync();
  201. var c1 = await testEnvironment.ConnectClientAsync();
  202. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  203. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  204. .WithTopicFilter("a")
  205. .WithTopicFilter("b")
  206. .WithTopicFilter("c")
  207. .Build());
  208. var c2 = await testEnvironment.ConnectClientAsync();
  209. await c2.PublishAsync("a");
  210. await Task.Delay(100);
  211. Assert.AreEqual(receivedMessagesCount, 1);
  212. await c2.PublishAsync("b");
  213. await Task.Delay(100);
  214. Assert.AreEqual(receivedMessagesCount, 2);
  215. await c2.PublishAsync("c");
  216. await Task.Delay(100);
  217. Assert.AreEqual(receivedMessagesCount, 3);
  218. }
  219. }
  220. [TestMethod]
  221. public async Task Subscribe_Multiple_In_Multiple_Request()
  222. {
  223. using (var testEnvironment = new TestEnvironment(TestContext))
  224. {
  225. var receivedMessagesCount = 0;
  226. await testEnvironment.StartServerAsync();
  227. var c1 = await testEnvironment.ConnectClientAsync();
  228. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  229. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  230. .WithTopicFilter("a")
  231. .Build());
  232. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  233. .WithTopicFilter("b")
  234. .Build());
  235. await c1.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
  236. .WithTopicFilter("c")
  237. .Build());
  238. var c2 = await testEnvironment.ConnectClientAsync();
  239. await c2.PublishAsync("a");
  240. await Task.Delay(100);
  241. Assert.AreEqual(receivedMessagesCount, 1);
  242. await c2.PublishAsync("b");
  243. await Task.Delay(100);
  244. Assert.AreEqual(receivedMessagesCount, 2);
  245. await c2.PublishAsync("c");
  246. await Task.Delay(100);
  247. Assert.AreEqual(receivedMessagesCount, 3);
  248. }
  249. }
  250. [TestMethod]
  251. public async Task Publish_From_Server()
  252. {
  253. using (var testEnvironment = new TestEnvironment(TestContext))
  254. {
  255. var server = await testEnvironment.StartServerAsync();
  256. var receivedMessagesCount = 0;
  257. var client = await testEnvironment.ConnectClientAsync();
  258. client.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  259. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  260. await client.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  261. await server.PublishAsync(message);
  262. await Task.Delay(1000);
  263. Assert.AreEqual(1, receivedMessagesCount);
  264. }
  265. }
  266. [TestMethod]
  267. public async Task Publish_Multiple_Clients()
  268. {
  269. var receivedMessagesCount = 0;
  270. var locked = new object();
  271. using (var testEnvironment = new TestEnvironment(TestContext))
  272. {
  273. await testEnvironment.StartServerAsync();
  274. var c1 = await testEnvironment.ConnectClientAsync();
  275. var c2 = await testEnvironment.ConnectClientAsync();
  276. c1.UseApplicationMessageReceivedHandler(c =>
  277. {
  278. lock (locked)
  279. {
  280. receivedMessagesCount++;
  281. }
  282. });
  283. c2.UseApplicationMessageReceivedHandler(c =>
  284. {
  285. lock (locked)
  286. {
  287. receivedMessagesCount++;
  288. }
  289. });
  290. await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  291. await c2.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  292. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  293. for (var i = 0; i < 1000; i++)
  294. {
  295. await c1.PublishAsync(message);
  296. }
  297. await Task.Delay(500);
  298. Assert.AreEqual(2000, receivedMessagesCount);
  299. }
  300. }
  301. [TestMethod]
  302. public async Task Session_Takeover()
  303. {
  304. using (var testEnvironment = new TestEnvironment(TestContext))
  305. {
  306. await testEnvironment.StartServerAsync();
  307. var options = new MqttClientOptionsBuilder()
  308. .WithCleanSession(false)
  309. .WithClientId("a");
  310. var client1 = await testEnvironment.ConnectClientAsync(options);
  311. await Task.Delay(500);
  312. var client2 = await testEnvironment.ConnectClientAsync(options);
  313. await Task.Delay(500);
  314. Assert.IsFalse(client1.IsConnected);
  315. Assert.IsTrue(client2.IsConnected);
  316. }
  317. }
  318. [TestMethod]
  319. public async Task No_Messages_If_No_Subscription()
  320. {
  321. using (var testEnvironment = new TestEnvironment(TestContext))
  322. {
  323. await testEnvironment.StartServerAsync();
  324. var client = await testEnvironment.ConnectClientAsync();
  325. var receivedMessages = new List<MqttApplicationMessage>();
  326. client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e =>
  327. {
  328. await client.PublishAsync("Connected");
  329. });
  330. client.UseApplicationMessageReceivedHandler(c =>
  331. {
  332. lock (receivedMessages)
  333. {
  334. receivedMessages.Add(c.ApplicationMessage);
  335. }
  336. });
  337. await Task.Delay(500);
  338. await client.PublishAsync("Hello");
  339. await Task.Delay(500);
  340. Assert.AreEqual(0, receivedMessages.Count);
  341. }
  342. }
  343. [TestMethod]
  344. public async Task Set_Subscription_At_Server()
  345. {
  346. using (var testEnvironment = new TestEnvironment(TestContext))
  347. {
  348. var server = await testEnvironment.StartServerAsync();
  349. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1"));
  350. var client = await testEnvironment.ConnectClientAsync();
  351. var receivedMessages = new List<MqttApplicationMessage>();
  352. client.UseApplicationMessageReceivedHandler(c =>
  353. {
  354. lock (receivedMessages)
  355. {
  356. receivedMessages.Add(c.ApplicationMessage);
  357. }
  358. });
  359. await Task.Delay(500);
  360. await client.PublishAsync("Hello");
  361. await Task.Delay(100);
  362. Assert.AreEqual(0, receivedMessages.Count);
  363. await client.PublishAsync("topic1");
  364. await Task.Delay(100);
  365. Assert.AreEqual(1, receivedMessages.Count);
  366. }
  367. }
  368. [TestMethod]
  369. public async Task Shutdown_Disconnects_Clients_Gracefully()
  370. {
  371. using (var testEnvironment = new TestEnvironment(TestContext))
  372. {
  373. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  374. var disconnectCalled = 0;
  375. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  376. c1.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e => disconnectCalled++);
  377. await Task.Delay(100);
  378. await server.StopAsync();
  379. await Task.Delay(100);
  380. Assert.AreEqual(1, disconnectCalled);
  381. }
  382. }
  383. [TestMethod]
  384. public async Task Handle_Clean_Disconnect()
  385. {
  386. using (var testEnvironment = new TestEnvironment(TestContext))
  387. {
  388. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  389. var clientConnectedCalled = 0;
  390. var clientDisconnectedCalled = 0;
  391. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => Interlocked.Increment(ref clientConnectedCalled));
  392. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ => Interlocked.Increment(ref clientDisconnectedCalled));
  393. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  394. Assert.AreEqual(1, clientConnectedCalled);
  395. Assert.AreEqual(0, clientDisconnectedCalled);
  396. await Task.Delay(500);
  397. await c1.DisconnectAsync();
  398. await Task.Delay(500);
  399. Assert.AreEqual(1, clientConnectedCalled);
  400. Assert.AreEqual(1, clientDisconnectedCalled);
  401. }
  402. }
  403. [TestMethod]
  404. public async Task Client_Disconnect_Without_Errors()
  405. {
  406. using (var testEnvironment = new TestEnvironment(TestContext))
  407. {
  408. bool clientWasConnected;
  409. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  410. try
  411. {
  412. var client = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  413. clientWasConnected = true;
  414. await client.DisconnectAsync();
  415. await Task.Delay(500);
  416. }
  417. finally
  418. {
  419. await server.StopAsync();
  420. }
  421. Assert.IsTrue(clientWasConnected);
  422. testEnvironment.ThrowIfLogErrors();
  423. }
  424. }
  425. [TestMethod]
  426. public async Task Handle_Lots_Of_Parallel_Retained_Messages()
  427. {
  428. const int ClientCount = 50;
  429. using (var testEnvironment = new TestEnvironment(TestContext))
  430. {
  431. var server = await testEnvironment.StartServerAsync();
  432. var tasks = new List<Task>();
  433. for (var i = 0; i < ClientCount; i++)
  434. {
  435. var i2 = i;
  436. var testEnvironment2 = testEnvironment;
  437. tasks.Add(Task.Run(async () =>
  438. {
  439. try
  440. {
  441. using (var client = await testEnvironment2.ConnectClientAsync())
  442. {
  443. // Clear retained message.
  444. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  445. .WithPayload(new byte[0]).WithRetainFlag().Build());
  446. // Set retained message.
  447. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  448. .WithPayload("value").WithRetainFlag().Build());
  449. await client.DisconnectAsync();
  450. }
  451. }
  452. catch (Exception exception)
  453. {
  454. testEnvironment2.TrackException(exception);
  455. }
  456. }));
  457. }
  458. await Task.WhenAll(tasks);
  459. await Task.Delay(1000);
  460. var retainedMessages = await server.GetRetainedApplicationMessagesAsync();
  461. Assert.AreEqual(ClientCount, retainedMessages.Count);
  462. for (var i = 0; i < ClientCount; i++)
  463. {
  464. Assert.IsTrue(retainedMessages.Any(m => m.Topic == "r" + i));
  465. }
  466. }
  467. }
  468. [TestMethod]
  469. public async Task Retained_Messages_Flow()
  470. {
  471. using (var testEnvironment = new TestEnvironment(TestContext))
  472. {
  473. var retainedMessage = new MqttApplicationMessageBuilder().WithTopic("r").WithPayload("r").WithRetainFlag().Build();
  474. await testEnvironment.StartServerAsync();
  475. var c1 = await testEnvironment.ConnectClientAsync();
  476. var receivedMessages = 0;
  477. var c2 = await testEnvironment.ConnectClientAsync();
  478. c2.UseApplicationMessageReceivedHandler(c =>
  479. {
  480. Interlocked.Increment(ref receivedMessages);
  481. });
  482. await c1.PublishAsync(retainedMessage);
  483. await c1.DisconnectAsync();
  484. await Task.Delay(500);
  485. for (var i = 0; i < 5; i++)
  486. {
  487. await c2.UnsubscribeAsync("r");
  488. await Task.Delay(100);
  489. Assert.AreEqual(i, receivedMessages);
  490. await c2.SubscribeAsync("r");
  491. await Task.Delay(100);
  492. Assert.AreEqual(i + 1, receivedMessages);
  493. }
  494. await c2.DisconnectAsync();
  495. }
  496. }
  497. [TestMethod]
  498. public async Task Receive_No_Retained_Message_After_Subscribe()
  499. {
  500. using (var testEnvironment = new TestEnvironment(TestContext))
  501. {
  502. await testEnvironment.StartServerAsync();
  503. var c1 = await testEnvironment.ConnectClientAsync();
  504. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  505. await c1.DisconnectAsync();
  506. var receivedMessagesCount = 0;
  507. var c2 = await testEnvironment.ConnectClientAsync();
  508. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  509. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained_other").Build());
  510. await Task.Delay(500);
  511. Assert.AreEqual(0, receivedMessagesCount);
  512. }
  513. }
  514. [TestMethod]
  515. public async Task Receive_Retained_Message_After_Subscribe()
  516. {
  517. using (var testEnvironment = new TestEnvironment(TestContext))
  518. {
  519. await testEnvironment.StartServerAsync();
  520. var c1 = await testEnvironment.ConnectClientAsync();
  521. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  522. await c1.DisconnectAsync();
  523. var receivedMessages = new List<MqttApplicationMessage>();
  524. var c2 = await testEnvironment.ConnectClientAsync();
  525. c2.UseApplicationMessageReceivedHandler(c =>
  526. {
  527. lock (receivedMessages)
  528. {
  529. receivedMessages.Add(c.ApplicationMessage);
  530. }
  531. });
  532. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  533. await Task.Delay(500);
  534. Assert.AreEqual(1, receivedMessages.Count);
  535. Assert.IsTrue(receivedMessages.First().Retain);
  536. }
  537. }
  538. [TestMethod]
  539. public async Task Clear_Retained_Message_With_Empty_Payload()
  540. {
  541. using (var testEnvironment = new TestEnvironment(TestContext))
  542. {
  543. var receivedMessagesCount = 0;
  544. await testEnvironment.StartServerAsync();
  545. var c1 = await testEnvironment.ConnectClientAsync();
  546. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  547. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build());
  548. await c1.DisconnectAsync();
  549. var c2 = await testEnvironment.ConnectClientAsync();
  550. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  551. await Task.Delay(200);
  552. await c2.SubscribeAsync(new TopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  553. await Task.Delay(500);
  554. Assert.AreEqual(0, receivedMessagesCount);
  555. }
  556. }
  557. [TestMethod]
  558. public async Task Clear_Retained_Message_With_Null_Payload()
  559. {
  560. using (var testEnvironment = new TestEnvironment(TestContext))
  561. {
  562. var receivedMessagesCount = 0;
  563. await testEnvironment.StartServerAsync();
  564. var c1 = await testEnvironment.ConnectClientAsync();
  565. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  566. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload((byte[])null).WithRetainFlag().Build());
  567. await c1.DisconnectAsync();
  568. var c2 = await testEnvironment.ConnectClientAsync();
  569. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  570. await Task.Delay(200);
  571. await c2.SubscribeAsync(new TopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  572. await Task.Delay(500);
  573. Assert.AreEqual(0, receivedMessagesCount);
  574. }
  575. }
  576. [TestMethod]
  577. public async Task Intercept_Application_Message()
  578. {
  579. using (var testEnvironment = new TestEnvironment(TestContext))
  580. {
  581. await testEnvironment.StartServerAsync(
  582. new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(
  583. c => { c.ApplicationMessage = new MqttApplicationMessage { Topic = "new_topic" }; }));
  584. string receivedTopic = null;
  585. var c1 = await testEnvironment.ConnectClientAsync();
  586. await c1.SubscribeAsync("#");
  587. c1.UseApplicationMessageReceivedHandler(a => { receivedTopic = a.ApplicationMessage.Topic; });
  588. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("original_topic").Build());
  589. await Task.Delay(500);
  590. Assert.AreEqual("new_topic", receivedTopic);
  591. }
  592. }
  593. [TestMethod]
  594. public async Task Persist_Retained_Message()
  595. {
  596. var serverStorage = new TestServerStorage();
  597. using (var testEnvironment = new TestEnvironment(TestContext))
  598. {
  599. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithStorage(serverStorage));
  600. var c1 = await testEnvironment.ConnectClientAsync();
  601. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  602. await Task.Delay(500);
  603. Assert.AreEqual(1, serverStorage.Messages.Count);
  604. }
  605. }
  606. [TestMethod]
  607. public async Task Publish_After_Client_Connects()
  608. {
  609. using (var testEnvironment = new TestEnvironment(TestContext))
  610. {
  611. var server = await testEnvironment.StartServerAsync();
  612. server.UseClientConnectedHandler(async e =>
  613. {
  614. await server.PublishAsync("/test/1", "true", MqttQualityOfServiceLevel.ExactlyOnce, false);
  615. });
  616. string receivedTopic = null;
  617. var c1 = await testEnvironment.ConnectClientAsync();
  618. c1.UseApplicationMessageReceivedHandler(e => { receivedTopic = e.ApplicationMessage.Topic; });
  619. await c1.SubscribeAsync("#");
  620. await testEnvironment.ConnectClientAsync();
  621. await testEnvironment.ConnectClientAsync();
  622. await testEnvironment.ConnectClientAsync();
  623. await testEnvironment.ConnectClientAsync();
  624. await Task.Delay(500);
  625. Assert.AreEqual("/test/1", receivedTopic);
  626. }
  627. }
  628. [TestMethod]
  629. public async Task Intercept_Message()
  630. {
  631. void Interceptor(MqttApplicationMessageInterceptorContext context)
  632. {
  633. context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
  634. }
  635. using (var testEnvironment = new TestEnvironment(TestContext))
  636. {
  637. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(Interceptor));
  638. var c1 = await testEnvironment.ConnectClientAsync();
  639. var c2 = await testEnvironment.ConnectClientAsync();
  640. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build());
  641. var isIntercepted = false;
  642. c2.UseApplicationMessageReceivedHandler(c =>
  643. {
  644. isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(c.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
  645. });
  646. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("test").Build());
  647. await c1.DisconnectAsync();
  648. await Task.Delay(500);
  649. Assert.IsTrue(isIntercepted);
  650. }
  651. }
  652. [TestMethod]
  653. public async Task Send_Long_Body()
  654. {
  655. using (var testEnvironment = new TestEnvironment(TestContext))
  656. {
  657. const int PayloadSizeInMB = 30;
  658. const int CharCount = PayloadSizeInMB * 1024 * 1024;
  659. var longBody = new byte[CharCount];
  660. byte @char = 32;
  661. for (long i = 0; i < PayloadSizeInMB * 1024L * 1024L; i++)
  662. {
  663. longBody[i] = @char;
  664. @char++;
  665. if (@char > 126)
  666. {
  667. @char = 32;
  668. }
  669. }
  670. byte[] receivedBody = null;
  671. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  672. var client1 = await testEnvironment.ConnectClientAsync();
  673. client1.UseApplicationMessageReceivedHandler(c =>
  674. {
  675. receivedBody = c.ApplicationMessage.Payload;
  676. });
  677. await client1.SubscribeAsync("string");
  678. var client2 = await testEnvironment.ConnectClientAsync();
  679. await client2.PublishAsync("string", longBody);
  680. await Task.Delay(500);
  681. Assert.IsTrue(longBody.SequenceEqual(receivedBody ?? new byte[0]));
  682. }
  683. }
  684. [TestMethod]
  685. public async Task Deny_Connection()
  686. {
  687. var serverOptions = new MqttServerOptionsBuilder().WithConnectionValidator(context =>
  688. {
  689. context.ReasonCode = MqttConnectReasonCode.NotAuthorized;
  690. });
  691. using (var testEnvironment = new TestEnvironment(TestContext))
  692. {
  693. testEnvironment.IgnoreClientLogErrors = true;
  694. await testEnvironment.StartServerAsync(serverOptions);
  695. var connectingFailedException = await Assert.ThrowsExceptionAsync<MqttConnectingFailedException>(() => testEnvironment.ConnectClientAsync());
  696. Assert.AreEqual(MqttClientConnectResultCode.NotAuthorized, connectingFailedException.ResultCode);
  697. }
  698. }
  699. private Dictionary<string, bool> _connected;
  700. private void ConnectionValidationHandler(MqttConnectionValidatorContext eventArgs)
  701. {
  702. if (_connected.ContainsKey(eventArgs.ClientId))
  703. {
  704. eventArgs.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  705. return;
  706. }
  707. _connected[eventArgs.ClientId] = true;
  708. eventArgs.ReasonCode = MqttConnectReasonCode.Success;
  709. return;
  710. }
  711. [TestMethod]
  712. public async Task Same_Client_Id_Refuse_Connection()
  713. {
  714. using (var testEnvironment = new TestEnvironment(TestContext))
  715. {
  716. testEnvironment.IgnoreClientLogErrors = true;
  717. _connected = new Dictionary<string, bool>();
  718. var options = new MqttServerOptionsBuilder();
  719. options.WithConnectionValidator(e => ConnectionValidationHandler(e));
  720. var server = await testEnvironment.StartServerAsync(options);
  721. var events = new List<string>();
  722. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ =>
  723. {
  724. lock (events)
  725. {
  726. events.Add("c");
  727. }
  728. });
  729. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ =>
  730. {
  731. lock (events)
  732. {
  733. events.Add("d");
  734. }
  735. });
  736. var clientOptions = new MqttClientOptionsBuilder()
  737. .WithClientId("same_id");
  738. // c
  739. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  740. c1.UseDisconnectedHandler(_ =>
  741. {
  742. lock (events)
  743. {
  744. events.Add("x");
  745. }
  746. });
  747. c1.UseApplicationMessageReceivedHandler(_ =>
  748. {
  749. lock (events)
  750. {
  751. events.Add("r");
  752. }
  753. });
  754. c1.SubscribeAsync("topic").Wait();
  755. await Task.Delay(500);
  756. c1.PublishAsync("topic").Wait();
  757. await Task.Delay(500);
  758. var flow = string.Join(string.Empty, events);
  759. Assert.AreEqual("cr", flow);
  760. try
  761. {
  762. await testEnvironment.ConnectClientAsync(clientOptions);
  763. Assert.Fail("same id connection is expected to fail");
  764. }
  765. catch
  766. {
  767. //same id connection is expected to fail
  768. }
  769. await Task.Delay(500);
  770. flow = string.Join(string.Empty, events);
  771. Assert.AreEqual("cr", flow);
  772. c1.PublishAsync("topic").Wait();
  773. await Task.Delay(500);
  774. flow = string.Join(string.Empty, events);
  775. Assert.AreEqual("crr", flow);
  776. }
  777. }
  778. [TestMethod]
  779. public async Task Same_Client_Id_Connect_Disconnect_Event_Order()
  780. {
  781. using (var testEnvironment = new TestEnvironment(TestContext))
  782. {
  783. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  784. var events = new List<string>();
  785. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ =>
  786. {
  787. lock (events)
  788. {
  789. events.Add("c");
  790. }
  791. });
  792. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ =>
  793. {
  794. lock (events)
  795. {
  796. events.Add("d");
  797. }
  798. });
  799. var clientOptions = new MqttClientOptionsBuilder()
  800. .WithClientId("same_id");
  801. // c
  802. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  803. await Task.Delay(500);
  804. var flow = string.Join(string.Empty, events);
  805. Assert.AreEqual("c", flow);
  806. // dc
  807. // Connect client with same client ID. Should disconnect existing client.
  808. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  809. c2.UseApplicationMessageReceivedHandler(_ =>
  810. {
  811. lock (events)
  812. {
  813. events.Add("r");
  814. }
  815. });
  816. c2.SubscribeAsync("topic").Wait();
  817. await Task.Delay(500);
  818. flow = string.Join(string.Empty, events);
  819. Assert.AreEqual("cdc", flow);
  820. // r
  821. c2.PublishAsync("topic").Wait();
  822. await Task.Delay(500);
  823. flow = string.Join(string.Empty, events);
  824. Assert.AreEqual("cdcr", flow);
  825. // nothing
  826. Assert.AreEqual(false, c1.IsConnected);
  827. await c1.DisconnectAsync();
  828. Assert.AreEqual(false, c1.IsConnected);
  829. await Task.Delay(500);
  830. // d
  831. Assert.AreEqual(true, c2.IsConnected);
  832. await c2.DisconnectAsync();
  833. await Task.Delay(500);
  834. await server.StopAsync();
  835. flow = string.Join(string.Empty, events);
  836. Assert.AreEqual("cdcrd", flow);
  837. }
  838. }
  839. [TestMethod]
  840. public async Task Remove_Session()
  841. {
  842. using (var testEnvironment = new TestEnvironment(TestContext))
  843. {
  844. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  845. var clientOptions = new MqttClientOptionsBuilder();
  846. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  847. await Task.Delay(500);
  848. Assert.AreEqual(1, (await server.GetClientStatusAsync()).Count);
  849. await c1.DisconnectAsync();
  850. await Task.Delay(500);
  851. Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count);
  852. }
  853. }
  854. [TestMethod]
  855. public async Task Stop_And_Restart()
  856. {
  857. using (var testEnvironment = new TestEnvironment(TestContext))
  858. {
  859. testEnvironment.IgnoreClientLogErrors = true;
  860. var server = await testEnvironment.StartServerAsync();
  861. await testEnvironment.ConnectClientAsync();
  862. await server.StopAsync();
  863. try
  864. {
  865. await testEnvironment.ConnectClientAsync();
  866. Assert.Fail("Connecting should fail.");
  867. }
  868. catch (Exception)
  869. {
  870. }
  871. await server.StartAsync(new MqttServerOptionsBuilder().WithDefaultEndpointPort(testEnvironment.ServerPort).Build());
  872. await testEnvironment.ConnectClientAsync();
  873. }
  874. }
  875. [TestMethod]
  876. public async Task Close_Idle_Connection()
  877. {
  878. using (var testEnvironment = new TestEnvironment(TestContext))
  879. {
  880. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));
  881. var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  882. await PlatformAbstractionLayer.ConnectAsync(client, "localhost", testEnvironment.ServerPort);
  883. // Don't send anything. The server should close the connection.
  884. await Task.Delay(TimeSpan.FromSeconds(3));
  885. try
  886. {
  887. var receivedBytes = await PlatformAbstractionLayer.ReceiveAsync(client, new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
  888. if (receivedBytes == 0)
  889. {
  890. return;
  891. }
  892. Assert.Fail("Receive should throw an exception.");
  893. }
  894. catch (SocketException)
  895. {
  896. }
  897. }
  898. }
  899. [TestMethod]
  900. public async Task Send_Garbage()
  901. {
  902. using (var testEnvironment = new TestEnvironment(TestContext))
  903. {
  904. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));
  905. // Send an invalid packet and ensure that the server will close the connection and stay in a waiting state
  906. // forever. This is security related.
  907. var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  908. await PlatformAbstractionLayer.ConnectAsync(client, "localhost", testEnvironment.ServerPort);
  909. var buffer = Encoding.UTF8.GetBytes("Garbage");
  910. client.Send(buffer, buffer.Length, SocketFlags.None);
  911. await Task.Delay(TimeSpan.FromSeconds(3));
  912. try
  913. {
  914. var receivedBytes = await PlatformAbstractionLayer.ReceiveAsync(client, new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
  915. if (receivedBytes == 0)
  916. {
  917. return;
  918. }
  919. Assert.Fail("Receive should throw an exception.");
  920. }
  921. catch (SocketException)
  922. {
  923. }
  924. }
  925. }
  926. [TestMethod]
  927. public async Task Do_Not_Send_Retained_Messages_For_Denied_Subscription()
  928. {
  929. using (var testEnvironment = new TestEnvironment(TestContext))
  930. {
  931. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(c =>
  932. {
  933. // This should lead to no subscriptions for "n" at all. So also no sending of retained messages.
  934. if (c.TopicFilter.Topic == "n")
  935. {
  936. c.AcceptSubscription = false;
  937. }
  938. }));
  939. // Prepare some retained messages.
  940. var client1 = await testEnvironment.ConnectClientAsync();
  941. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("y").WithPayload("x").WithRetainFlag().Build());
  942. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("n").WithPayload("x").WithRetainFlag().Build());
  943. await client1.DisconnectAsync();
  944. await Task.Delay(500);
  945. // Subscribe to all retained message types.
  946. // It is important to do this in a range of filters to ensure that a subscription is not "hidden".
  947. var client2 = await testEnvironment.ConnectClientAsync();
  948. var buffer = new StringBuilder();
  949. client2.UseApplicationMessageReceivedHandler(c =>
  950. {
  951. lock (buffer)
  952. {
  953. buffer.Append(c.ApplicationMessage.Topic);
  954. }
  955. });
  956. await client2.SubscribeAsync(new TopicFilter { Topic = "y" }, new TopicFilter { Topic = "n" });
  957. await Task.Delay(500);
  958. Assert.AreEqual("y", buffer.ToString());
  959. }
  960. }
  961. [TestMethod]
  962. public async Task Collect_Messages_In_Disconnected_Session()
  963. {
  964. using (var testEnvironment = new TestEnvironment(TestContext))
  965. {
  966. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions());
  967. // Create the session including the subscription.
  968. var client1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a"));
  969. await client1.SubscribeAsync("x");
  970. await client1.DisconnectAsync();
  971. await Task.Delay(500);
  972. var clientStatus = await server.GetClientStatusAsync();
  973. Assert.AreEqual(0, clientStatus.Count);
  974. var client2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("b"));
  975. await client2.PublishAsync("x", "1");
  976. await client2.PublishAsync("x", "2");
  977. await client2.PublishAsync("x", "3");
  978. await client2.DisconnectAsync();
  979. await Task.Delay(500);
  980. clientStatus = await server.GetClientStatusAsync();
  981. var sessionStatus = await server.GetSessionStatusAsync();
  982. Assert.AreEqual(0, clientStatus.Count);
  983. Assert.AreEqual(2, sessionStatus.Count);
  984. Assert.AreEqual(3, sessionStatus.First(s => s.ClientId == client1.Options.ClientId).PendingApplicationMessagesCount);
  985. }
  986. }
  987. private static async Task TestPublishAsync(
  988. string topic,
  989. MqttQualityOfServiceLevel qualityOfServiceLevel,
  990. string topicFilter,
  991. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  992. int expectedReceivedMessagesCount,
  993. TestContext testContext)
  994. {
  995. using (var testEnvironment = new TestEnvironment(testContext))
  996. {
  997. var receivedMessagesCount = 0;
  998. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  999. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("receiver"));
  1000. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  1001. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());
  1002. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("sender"));
  1003. await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build());
  1004. await c2.DisconnectAsync().ConfigureAwait(false);
  1005. await Task.Delay(500);
  1006. await c1.UnsubscribeAsync(topicFilter);
  1007. await Task.Delay(500);
  1008. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  1009. }
  1010. }
  1011. }
  1012. }