You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1061 lines
39 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.Sockets;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using Microsoft.VisualStudio.TestTools.UnitTesting;
  9. using MQTTnet.Adapter;
  10. using MQTTnet.Client;
  11. using MQTTnet.Client.Connecting;
  12. using MQTTnet.Client.Disconnecting;
  13. using MQTTnet.Client.Options;
  14. using MQTTnet.Client.Receiving;
  15. using MQTTnet.Protocol;
  16. using MQTTnet.Server;
  17. using MQTTnet.Tests.Mockups;
  18. namespace MQTTnet.Tests
  19. {
  20. [TestClass]
  21. public class Server_Tests
  22. {
  23. [TestMethod]
  24. public async Task Publish_At_Most_Once_0x00()
  25. {
  26. await TestPublishAsync(
  27. "A/B/C",
  28. MqttQualityOfServiceLevel.AtMostOnce,
  29. "A/B/C",
  30. MqttQualityOfServiceLevel.AtMostOnce,
  31. 1);
  32. }
  33. [TestMethod]
  34. public async Task Publish_At_Least_Once_0x01()
  35. {
  36. await TestPublishAsync(
  37. "A/B/C",
  38. MqttQualityOfServiceLevel.AtLeastOnce,
  39. "A/B/C",
  40. MqttQualityOfServiceLevel.AtLeastOnce,
  41. 1);
  42. }
  43. [TestMethod]
  44. public async Task Publish_Exactly_Once_0x02()
  45. {
  46. await TestPublishAsync(
  47. "A/B/C",
  48. MqttQualityOfServiceLevel.ExactlyOnce,
  49. "A/B/C",
  50. MqttQualityOfServiceLevel.ExactlyOnce,
  51. 1);
  52. }
  53. [TestMethod]
  54. public async Task Use_Clean_Session()
  55. {
  56. using (var testEnvironment = new TestEnvironment())
  57. {
  58. await testEnvironment.StartServerAsync();
  59. var client = testEnvironment.CreateClient();
  60. var connectResult = await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost", testEnvironment.ServerPort).WithCleanSession().Build());
  61. Assert.IsFalse(connectResult.IsSessionPresent);
  62. }
  63. }
  64. [TestMethod]
  65. public async Task Will_Message_Do_Not_Send()
  66. {
  67. using (var testEnvironment = new TestEnvironment())
  68. {
  69. var receivedMessagesCount = 0;
  70. await testEnvironment.StartServerAsync();
  71. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  72. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  73. var c1 = await testEnvironment.ConnectClientAsync();
  74. c1.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(c => Interlocked.Increment(ref receivedMessagesCount));
  75. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  76. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  77. await c2.DisconnectAsync().ConfigureAwait(false);
  78. await Task.Delay(1000);
  79. Assert.AreEqual(0, receivedMessagesCount);
  80. }
  81. }
  82. [TestMethod]
  83. public async Task Will_Message_Send()
  84. {
  85. using (var testEnvironment = new TestEnvironment())
  86. {
  87. var receivedMessagesCount = 0;
  88. await testEnvironment.StartServerAsync();
  89. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  90. var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);
  91. var c1 = await testEnvironment.ConnectClientAsync();
  92. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  93. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  94. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  95. c2.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent.
  96. await Task.Delay(1000);
  97. Assert.AreEqual(1, receivedMessagesCount);
  98. }
  99. }
  100. [TestMethod]
  101. public async Task Subscribe_Unsubscribe()
  102. {
  103. using (var testEnvironment = new TestEnvironment())
  104. {
  105. var receivedMessagesCount = 0;
  106. var server = await testEnvironment.StartServerAsync();
  107. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c1"));
  108. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  109. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("c2"));
  110. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  111. await c2.PublishAsync(message);
  112. await Task.Delay(500);
  113. Assert.AreEqual(0, receivedMessagesCount);
  114. var subscribeEventCalled = false;
  115. server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(e =>
  116. {
  117. subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1";
  118. });
  119. await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  120. await Task.Delay(250);
  121. Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");
  122. await c2.PublishAsync(message);
  123. await Task.Delay(250);
  124. Assert.AreEqual(1, receivedMessagesCount);
  125. var unsubscribeEventCalled = false;
  126. server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e =>
  127. {
  128. unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1";
  129. });
  130. await c1.UnsubscribeAsync("a");
  131. await Task.Delay(250);
  132. Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");
  133. await c2.PublishAsync(message);
  134. await Task.Delay(500);
  135. Assert.AreEqual(1, receivedMessagesCount);
  136. await Task.Delay(500);
  137. Assert.AreEqual(1, receivedMessagesCount);
  138. }
  139. }
  140. [TestMethod]
  141. public async Task Publish_From_Server()
  142. {
  143. using (var testEnvironment = new TestEnvironment())
  144. {
  145. var server = await testEnvironment.StartServerAsync();
  146. var receivedMessagesCount = 0;
  147. var client = await testEnvironment.ConnectClientAsync();
  148. client.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  149. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  150. await client.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  151. await server.PublishAsync(message);
  152. await Task.Delay(1000);
  153. Assert.AreEqual(1, receivedMessagesCount);
  154. }
  155. }
  156. [TestMethod]
  157. public async Task Publish_Multiple_Clients()
  158. {
  159. var receivedMessagesCount = 0;
  160. var locked = new object();
  161. using (var testEnvironment = new TestEnvironment())
  162. {
  163. await testEnvironment.StartServerAsync();
  164. var c1 = await testEnvironment.ConnectClientAsync();
  165. var c2 = await testEnvironment.ConnectClientAsync();
  166. c1.UseApplicationMessageReceivedHandler(c =>
  167. {
  168. lock (locked)
  169. {
  170. receivedMessagesCount++;
  171. }
  172. });
  173. c2.UseApplicationMessageReceivedHandler(c =>
  174. {
  175. lock (locked)
  176. {
  177. receivedMessagesCount++;
  178. }
  179. });
  180. await c1.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  181. await c2.SubscribeAsync(new TopicFilter { Topic = "a", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  182. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  183. for (var i = 0; i < 1000; i++)
  184. {
  185. await c1.PublishAsync(message);
  186. }
  187. await Task.Delay(500);
  188. Assert.AreEqual(2000, receivedMessagesCount);
  189. }
  190. }
  191. [TestMethod]
  192. public async Task Session_Takeover()
  193. {
  194. using (var testEnvironment = new TestEnvironment())
  195. {
  196. await testEnvironment.StartServerAsync();
  197. var options = new MqttClientOptionsBuilder()
  198. .WithCleanSession(false)
  199. .WithClientId("a");
  200. var client1 = await testEnvironment.ConnectClientAsync(options);
  201. await Task.Delay(500);
  202. var client2 = await testEnvironment.ConnectClientAsync(options);
  203. await Task.Delay(500);
  204. Assert.IsFalse(client1.IsConnected);
  205. Assert.IsTrue(client2.IsConnected);
  206. }
  207. }
  208. [TestMethod]
  209. public async Task No_Messages_If_No_Subscription()
  210. {
  211. using (var testEnvironment = new TestEnvironment())
  212. {
  213. await testEnvironment.StartServerAsync();
  214. var client = await testEnvironment.ConnectClientAsync();
  215. var receivedMessages = new List<MqttApplicationMessage>();
  216. client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e =>
  217. {
  218. await client.PublishAsync("Connected");
  219. });
  220. client.UseApplicationMessageReceivedHandler(c =>
  221. {
  222. lock (receivedMessages)
  223. {
  224. receivedMessages.Add(c.ApplicationMessage);
  225. }
  226. });
  227. await Task.Delay(500);
  228. await client.PublishAsync("Hello");
  229. await Task.Delay(500);
  230. Assert.AreEqual(0, receivedMessages.Count);
  231. }
  232. }
  233. [TestMethod]
  234. public async Task Set_Subscription_At_Server()
  235. {
  236. using (var testEnvironment = new TestEnvironment())
  237. {
  238. var server = await testEnvironment.StartServerAsync();
  239. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1"));
  240. var client = await testEnvironment.ConnectClientAsync();
  241. var receivedMessages = new List<MqttApplicationMessage>();
  242. client.UseApplicationMessageReceivedHandler(c =>
  243. {
  244. lock (receivedMessages)
  245. {
  246. receivedMessages.Add(c.ApplicationMessage);
  247. }
  248. });
  249. await Task.Delay(500);
  250. await client.PublishAsync("Hello");
  251. await Task.Delay(100);
  252. Assert.AreEqual(0, receivedMessages.Count);
  253. await client.PublishAsync("topic1");
  254. await Task.Delay(100);
  255. Assert.AreEqual(1, receivedMessages.Count);
  256. }
  257. }
  258. [TestMethod]
  259. public async Task Shutdown_Disconnects_Clients_Gracefully()
  260. {
  261. using (var testEnvironment = new TestEnvironment())
  262. {
  263. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  264. var disconnectCalled = 0;
  265. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  266. c1.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e => disconnectCalled++);
  267. await Task.Delay(100);
  268. await server.StopAsync();
  269. await Task.Delay(100);
  270. Assert.AreEqual(1, disconnectCalled);
  271. }
  272. }
  273. [TestMethod]
  274. public async Task Handle_Clean_Disconnect()
  275. {
  276. using (var testEnvironment = new TestEnvironment())
  277. {
  278. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  279. var clientConnectedCalled = 0;
  280. var clientDisconnectedCalled = 0;
  281. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => Interlocked.Increment(ref clientConnectedCalled));
  282. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ => Interlocked.Increment(ref clientDisconnectedCalled));
  283. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  284. Assert.AreEqual(1, clientConnectedCalled);
  285. Assert.AreEqual(0, clientDisconnectedCalled);
  286. await Task.Delay(500);
  287. await c1.DisconnectAsync();
  288. await Task.Delay(500);
  289. Assert.AreEqual(1, clientConnectedCalled);
  290. Assert.AreEqual(1, clientDisconnectedCalled);
  291. }
  292. }
  293. [TestMethod]
  294. public async Task Client_Disconnect_Without_Errors()
  295. {
  296. using (var testEnvironment = new TestEnvironment())
  297. {
  298. bool clientWasConnected;
  299. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  300. try
  301. {
  302. var client = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder());
  303. clientWasConnected = true;
  304. await client.DisconnectAsync();
  305. await Task.Delay(500);
  306. }
  307. finally
  308. {
  309. await server.StopAsync();
  310. }
  311. Assert.IsTrue(clientWasConnected);
  312. testEnvironment.ThrowIfLogErrors();
  313. }
  314. }
  315. [TestMethod]
  316. public async Task Handle_Lots_Of_Parallel_Retained_Messages()
  317. {
  318. const int ClientCount = 50;
  319. using (var testEnvironment = new TestEnvironment())
  320. {
  321. var server = await testEnvironment.StartServerAsync();
  322. var tasks = new List<Task>();
  323. for (var i = 0; i < ClientCount; i++)
  324. {
  325. var i2 = i;
  326. var testEnvironment2 = testEnvironment;
  327. tasks.Add(Task.Run(async () =>
  328. {
  329. try
  330. {
  331. using (var client = await testEnvironment2.ConnectClientAsync())
  332. {
  333. // Clear retained message.
  334. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  335. .WithPayload(new byte[0]).WithRetainFlag().Build());
  336. // Set retained message.
  337. await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i2)
  338. .WithPayload("value").WithRetainFlag().Build());
  339. await client.DisconnectAsync();
  340. }
  341. }
  342. catch (Exception exception)
  343. {
  344. testEnvironment2.TrackException(exception);
  345. }
  346. }));
  347. }
  348. await Task.WhenAll(tasks);
  349. await Task.Delay(1000);
  350. var retainedMessages = await server.GetRetainedApplicationMessagesAsync();
  351. Assert.AreEqual(ClientCount, retainedMessages.Count);
  352. for (var i = 0; i < ClientCount; i++)
  353. {
  354. Assert.IsTrue(retainedMessages.Any(m => m.Topic == "r" + i));
  355. }
  356. }
  357. }
  358. [TestMethod]
  359. public async Task Retained_Messages_Flow()
  360. {
  361. using (var testEnvironment = new TestEnvironment())
  362. {
  363. var retainedMessage = new MqttApplicationMessageBuilder().WithTopic("r").WithPayload("r").WithRetainFlag().Build();
  364. await testEnvironment.StartServerAsync();
  365. var c1 = await testEnvironment.ConnectClientAsync();
  366. var receivedMessages = 0;
  367. var c2 = await testEnvironment.ConnectClientAsync();
  368. c2.UseApplicationMessageReceivedHandler(c =>
  369. {
  370. Interlocked.Increment(ref receivedMessages);
  371. });
  372. await c1.PublishAsync(retainedMessage);
  373. await c1.DisconnectAsync();
  374. await Task.Delay(500);
  375. for (var i = 0; i < 5; i++)
  376. {
  377. await c2.UnsubscribeAsync("r");
  378. await Task.Delay(100);
  379. Assert.AreEqual(i, receivedMessages);
  380. await c2.SubscribeAsync("r");
  381. await Task.Delay(100);
  382. Assert.AreEqual(i + 1, receivedMessages);
  383. }
  384. await c2.DisconnectAsync();
  385. }
  386. }
  387. [TestMethod]
  388. public async Task Receive_No_Retained_Message_After_Subscribe()
  389. {
  390. using (var testEnvironment = new TestEnvironment())
  391. {
  392. await testEnvironment.StartServerAsync();
  393. var c1 = await testEnvironment.ConnectClientAsync();
  394. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  395. await c1.DisconnectAsync();
  396. var receivedMessagesCount = 0;
  397. var c2 = await testEnvironment.ConnectClientAsync();
  398. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  399. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained_other").Build());
  400. await Task.Delay(500);
  401. Assert.AreEqual(0, receivedMessagesCount);
  402. }
  403. }
  404. [TestMethod]
  405. public async Task Receive_Retained_Message_After_Subscribe()
  406. {
  407. using (var testEnvironment = new TestEnvironment())
  408. {
  409. await testEnvironment.StartServerAsync();
  410. var c1 = await testEnvironment.ConnectClientAsync();
  411. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  412. await c1.DisconnectAsync();
  413. var receivedMessages = new List<MqttApplicationMessage>();
  414. var c2 = await testEnvironment.ConnectClientAsync();
  415. c2.UseApplicationMessageReceivedHandler(c =>
  416. {
  417. lock (receivedMessages)
  418. {
  419. receivedMessages.Add(c.ApplicationMessage);
  420. }
  421. });
  422. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  423. await Task.Delay(500);
  424. Assert.AreEqual(1, receivedMessages.Count);
  425. Assert.IsTrue(receivedMessages.First().Retain);
  426. }
  427. }
  428. [TestMethod]
  429. public async Task Clear_Retained_Message_With_Empty_Payload()
  430. {
  431. using (var testEnvironment = new TestEnvironment())
  432. {
  433. var receivedMessagesCount = 0;
  434. await testEnvironment.StartServerAsync();
  435. var c1 = await testEnvironment.ConnectClientAsync();
  436. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  437. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build());
  438. await c1.DisconnectAsync();
  439. var c2 = await testEnvironment.ConnectClientAsync();
  440. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  441. await Task.Delay(200);
  442. await c2.SubscribeAsync(new TopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  443. await Task.Delay(500);
  444. Assert.AreEqual(0, receivedMessagesCount);
  445. }
  446. }
  447. [TestMethod]
  448. public async Task Clear_Retained_Message_With_Null_Payload()
  449. {
  450. using (var testEnvironment = new TestEnvironment())
  451. {
  452. var receivedMessagesCount = 0;
  453. await testEnvironment.StartServerAsync();
  454. var c1 = await testEnvironment.ConnectClientAsync();
  455. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  456. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload((byte[])null).WithRetainFlag().Build());
  457. await c1.DisconnectAsync();
  458. var c2 = await testEnvironment.ConnectClientAsync();
  459. c2.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  460. await Task.Delay(200);
  461. await c2.SubscribeAsync(new TopicFilter { Topic = "retained", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  462. await Task.Delay(500);
  463. Assert.AreEqual(0, receivedMessagesCount);
  464. }
  465. }
  466. [TestMethod]
  467. public async Task Intercept_Application_Message()
  468. {
  469. using (var testEnvironment = new TestEnvironment())
  470. {
  471. await testEnvironment.StartServerAsync(
  472. new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(
  473. c => { c.ApplicationMessage = new MqttApplicationMessage {Topic = "new_topic" }; }));
  474. string receivedTopic = null;
  475. var c1 = await testEnvironment.ConnectClientAsync();
  476. await c1.SubscribeAsync("#");
  477. c1.UseApplicationMessageReceivedHandler(a => { receivedTopic = a.ApplicationMessage.Topic; });
  478. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("original_topic").Build());
  479. await Task.Delay(500);
  480. Assert.AreEqual("new_topic", receivedTopic);
  481. }
  482. }
  483. [TestMethod]
  484. public async Task Persist_Retained_Message()
  485. {
  486. var serverStorage = new TestServerStorage();
  487. using (var testEnvironment = new TestEnvironment())
  488. {
  489. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithStorage(serverStorage));
  490. var c1 = await testEnvironment.ConnectClientAsync();
  491. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  492. await Task.Delay(500);
  493. Assert.AreEqual(1, serverStorage.Messages.Count);
  494. }
  495. }
  496. [TestMethod]
  497. public async Task Publish_After_Client_Connects()
  498. {
  499. using (var testEnvironment = new TestEnvironment())
  500. {
  501. var server = await testEnvironment.StartServerAsync();
  502. server.UseClientConnectedHandler(async e =>
  503. {
  504. await server.PublishAsync("/test/1", "true", MqttQualityOfServiceLevel.ExactlyOnce, false);
  505. });
  506. string receivedTopic = null;
  507. var c1 = await testEnvironment.ConnectClientAsync();
  508. c1.UseApplicationMessageReceivedHandler(e => { receivedTopic = e.ApplicationMessage.Topic; });
  509. await c1.SubscribeAsync("#");
  510. await testEnvironment.ConnectClientAsync();
  511. await testEnvironment.ConnectClientAsync();
  512. await testEnvironment.ConnectClientAsync();
  513. await testEnvironment.ConnectClientAsync();
  514. await Task.Delay(500);
  515. Assert.AreEqual("/test/1", receivedTopic);
  516. }
  517. }
  518. [TestMethod]
  519. public async Task Intercept_Message()
  520. {
  521. void Interceptor(MqttApplicationMessageInterceptorContext context)
  522. {
  523. context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
  524. }
  525. using (var testEnvironment = new TestEnvironment())
  526. {
  527. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(Interceptor));
  528. var c1 = await testEnvironment.ConnectClientAsync();
  529. var c2 = await testEnvironment.ConnectClientAsync();
  530. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build());
  531. var isIntercepted = false;
  532. c2.UseApplicationMessageReceivedHandler(c =>
  533. {
  534. isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(c.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
  535. });
  536. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("test").Build());
  537. await c1.DisconnectAsync();
  538. await Task.Delay(500);
  539. Assert.IsTrue(isIntercepted);
  540. }
  541. }
  542. [TestMethod]
  543. public async Task Send_Long_Body()
  544. {
  545. using (var testEnvironment = new TestEnvironment())
  546. {
  547. const int PayloadSizeInMB = 30;
  548. const int CharCount = PayloadSizeInMB * 1024 * 1024;
  549. var longBody = new byte[CharCount];
  550. byte @char = 32;
  551. for (long i = 0; i < PayloadSizeInMB * 1024L * 1024L; i++)
  552. {
  553. longBody[i] = @char;
  554. @char++;
  555. if (@char > 126)
  556. {
  557. @char = 32;
  558. }
  559. }
  560. byte[] receivedBody = null;
  561. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  562. var client1 = await testEnvironment.ConnectClientAsync();
  563. client1.UseApplicationMessageReceivedHandler(c =>
  564. {
  565. receivedBody = c.ApplicationMessage.Payload;
  566. });
  567. await client1.SubscribeAsync("string");
  568. var client2 = await testEnvironment.ConnectClientAsync();
  569. await client2.PublishAsync("string", longBody);
  570. await Task.Delay(500);
  571. Assert.IsTrue(longBody.SequenceEqual(receivedBody ?? new byte[0]));
  572. }
  573. }
  574. [TestMethod]
  575. public async Task Deny_Connection()
  576. {
  577. var serverOptions = new MqttServerOptionsBuilder().WithConnectionValidator(context =>
  578. {
  579. context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized;
  580. });
  581. using (var testEnvironment = new TestEnvironment())
  582. {
  583. testEnvironment.IgnoreClientLogErrors = true;
  584. await testEnvironment.StartServerAsync(serverOptions);
  585. try
  586. {
  587. await testEnvironment.ConnectClientAsync();
  588. Assert.Fail("An exception should be raised.");
  589. }
  590. catch (Exception exception)
  591. {
  592. if (exception is MqttConnectingFailedException connectingFailedException)
  593. {
  594. Assert.AreEqual(MqttClientConnectResultCode.NotAuthorized, connectingFailedException.ResultCode);
  595. }
  596. else
  597. {
  598. Assert.Fail("Wrong exception.");
  599. }
  600. }
  601. }
  602. }
  603. [TestMethod]
  604. public async Task Same_Client_Id_Connect_Disconnect_Event_Order()
  605. {
  606. using (var testEnvironment = new TestEnvironment())
  607. {
  608. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  609. var events = new List<string>();
  610. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ =>
  611. {
  612. lock (events)
  613. {
  614. events.Add("c");
  615. }
  616. });
  617. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(_ =>
  618. {
  619. lock (events)
  620. {
  621. events.Add("d");
  622. }
  623. });
  624. var clientOptions = new MqttClientOptionsBuilder()
  625. .WithClientId("same_id");
  626. // c
  627. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  628. await Task.Delay(500);
  629. var flow = string.Join(string.Empty, events);
  630. Assert.AreEqual("c", flow);
  631. // dc
  632. var c2 = await testEnvironment.ConnectClientAsync(clientOptions);
  633. await Task.Delay(500);
  634. flow = string.Join(string.Empty, events);
  635. Assert.AreEqual("cdc", flow);
  636. // nothing
  637. await c1.DisconnectAsync();
  638. await Task.Delay(500);
  639. // d
  640. await c2.DisconnectAsync();
  641. await Task.Delay(500);
  642. await server.StopAsync();
  643. flow = string.Join(string.Empty, events);
  644. Assert.AreEqual("cdcd", flow);
  645. }
  646. }
  647. [TestMethod]
  648. public async Task Remove_Session()
  649. {
  650. using (var testEnvironment = new TestEnvironment())
  651. {
  652. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  653. var clientOptions = new MqttClientOptionsBuilder();
  654. var c1 = await testEnvironment.ConnectClientAsync(clientOptions);
  655. await Task.Delay(500);
  656. Assert.AreEqual(1, (await server.GetClientStatusAsync()).Count);
  657. await c1.DisconnectAsync();
  658. await Task.Delay(500);
  659. Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count);
  660. }
  661. }
  662. [TestMethod]
  663. public async Task Stop_And_Restart()
  664. {
  665. using (var testEnvironment = new TestEnvironment())
  666. {
  667. testEnvironment.IgnoreClientLogErrors = true;
  668. var server = await testEnvironment.StartServerAsync();
  669. await testEnvironment.ConnectClientAsync();
  670. await server.StopAsync();
  671. try
  672. {
  673. await testEnvironment.ConnectClientAsync();
  674. Assert.Fail("Connecting should fail.");
  675. }
  676. catch (Exception)
  677. {
  678. }
  679. await server.StartAsync(new MqttServerOptionsBuilder().WithDefaultEndpointPort(testEnvironment.ServerPort).Build());
  680. await testEnvironment.ConnectClientAsync();
  681. }
  682. }
  683. [TestMethod]
  684. public async Task Close_Idle_Connection()
  685. {
  686. using (var testEnvironment = new TestEnvironment())
  687. {
  688. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));
  689. var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  690. await client.ConnectAsync("localhost", testEnvironment.ServerPort);
  691. // Don't send anything. The server should close the connection.
  692. await Task.Delay(TimeSpan.FromSeconds(3));
  693. try
  694. {
  695. var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
  696. if (receivedBytes == 0)
  697. {
  698. return;
  699. }
  700. Assert.Fail("Receive should throw an exception.");
  701. }
  702. catch (SocketException)
  703. {
  704. }
  705. }
  706. }
  707. [TestMethod]
  708. public async Task Send_Garbage()
  709. {
  710. using (var testEnvironment = new TestEnvironment())
  711. {
  712. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));
  713. // Send an invalid packet and ensure that the server will close the connection and stay in a waiting state
  714. // forever. This is security related.
  715. var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  716. await client.ConnectAsync("localhost", testEnvironment.ServerPort);
  717. await client.SendAsync(Encoding.UTF8.GetBytes("Garbage"), SocketFlags.None);
  718. await Task.Delay(TimeSpan.FromSeconds(3));
  719. try
  720. {
  721. var receivedBytes = await client.ReceiveAsync(new ArraySegment<byte>(new byte[10]), SocketFlags.Partial);
  722. if (receivedBytes == 0)
  723. {
  724. return;
  725. }
  726. Assert.Fail("Receive should throw an exception.");
  727. }
  728. catch (SocketException)
  729. {
  730. }
  731. }
  732. }
  733. [TestMethod]
  734. public async Task Do_Not_Send_Retained_Messages_For_Denied_Subscription()
  735. {
  736. using (var testEnvironment = new TestEnvironment())
  737. {
  738. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(c =>
  739. {
  740. // This should lead to no subscriptions for "n" at all. So also no sending of retained messages.
  741. if (c.TopicFilter.Topic == "n")
  742. {
  743. c.AcceptSubscription = false;
  744. }
  745. }));
  746. // Prepare some retained messages.
  747. var client1 = await testEnvironment.ConnectClientAsync();
  748. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("y").WithPayload("x").WithRetainFlag().Build());
  749. await client1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("n").WithPayload("x").WithRetainFlag().Build());
  750. await client1.DisconnectAsync();
  751. await Task.Delay(500);
  752. // Subscribe to all retained message types.
  753. // It is important to do this in a range of filters to ensure that a subscription is not "hidden".
  754. var client2 = await testEnvironment.ConnectClientAsync();
  755. var buffer = new StringBuilder();
  756. client2.UseApplicationMessageReceivedHandler(c =>
  757. {
  758. lock (buffer)
  759. {
  760. buffer.Append(c.ApplicationMessage.Topic);
  761. }
  762. });
  763. await client2.SubscribeAsync(new TopicFilter { Topic = "y" }, new TopicFilter { Topic = "n" });
  764. await Task.Delay(500);
  765. Assert.AreEqual("y", buffer.ToString());
  766. }
  767. }
  768. [TestMethod]
  769. public async Task Collect_Messages_In_Disconnected_Session()
  770. {
  771. using (var testEnvironment = new TestEnvironment())
  772. {
  773. var server = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithPersistentSessions());
  774. // Create the session including the subscription.
  775. var client1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("a"));
  776. await client1.SubscribeAsync("x");
  777. await client1.DisconnectAsync();
  778. await Task.Delay(500);
  779. var clientStatus = await server.GetClientStatusAsync();
  780. Assert.AreEqual(0, clientStatus.Count);
  781. var client2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("b"));
  782. await client2.PublishAsync("x", "1");
  783. await client2.PublishAsync("x", "2");
  784. await client2.PublishAsync("x", "3");
  785. await client2.DisconnectAsync();
  786. await Task.Delay(500);
  787. clientStatus = await server.GetClientStatusAsync();
  788. var sessionStatus = await server.GetSessionStatusAsync();
  789. Assert.AreEqual(0, clientStatus.Count);
  790. Assert.AreEqual(2, sessionStatus.Count);
  791. Assert.AreEqual(3, sessionStatus.First(s => s.ClientId == "a").PendingApplicationMessagesCount);
  792. }
  793. }
  794. private static async Task TestPublishAsync(
  795. string topic,
  796. MqttQualityOfServiceLevel qualityOfServiceLevel,
  797. string topicFilter,
  798. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  799. int expectedReceivedMessagesCount)
  800. {
  801. using (var testEnvironment = new TestEnvironment())
  802. {
  803. var receivedMessagesCount = 0;
  804. await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder());
  805. var c1 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("receiver"));
  806. c1.UseApplicationMessageReceivedHandler(c => Interlocked.Increment(ref receivedMessagesCount));
  807. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());
  808. var c2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("sender"));
  809. await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build());
  810. await c2.DisconnectAsync().ConfigureAwait(false);
  811. await Task.Delay(500);
  812. await c1.UnsubscribeAsync(topicFilter);
  813. await Task.Delay(500);
  814. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  815. }
  816. }
  817. }
  818. }