Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 
 

782 rindas
27 KiB

  1. using Microsoft.VisualStudio.TestTools.UnitTesting;
  2. using MQTTnet.Client;
  3. using MQTTnet.Diagnostics;
  4. using MQTTnet.Protocol;
  5. using MQTTnet.Server;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Linq;
  9. using System.Text;
  10. using System.Threading;
  11. using System.Threading.Tasks;
  12. using MQTTnet.Adapter;
  13. using MQTTnet.Implementations;
  14. namespace MQTTnet.Core.Tests
  15. {
  16. [TestClass]
  17. public class MqttServerTests
  18. {
  19. [TestMethod]
  20. public void MqttServer_PublishSimple_AtMostOnce()
  21. {
  22. TestPublishAsync(
  23. "A/B/C",
  24. MqttQualityOfServiceLevel.AtMostOnce,
  25. "A/B/C",
  26. MqttQualityOfServiceLevel.AtMostOnce,
  27. 1).Wait();
  28. }
  29. [TestMethod]
  30. public void MqttServer_PublishSimple_AtLeastOnce()
  31. {
  32. TestPublishAsync(
  33. "A/B/C",
  34. MqttQualityOfServiceLevel.AtLeastOnce,
  35. "A/B/C",
  36. MqttQualityOfServiceLevel.AtLeastOnce,
  37. 1).Wait();
  38. }
  39. [TestMethod]
  40. public void MqttServer_PublishSimple_ExactlyOnce()
  41. {
  42. TestPublishAsync(
  43. "A/B/C",
  44. MqttQualityOfServiceLevel.ExactlyOnce,
  45. "A/B/C",
  46. MqttQualityOfServiceLevel.ExactlyOnce,
  47. 1).Wait();
  48. }
  49. [TestMethod]
  50. public async Task MqttServer_WillMessage()
  51. {
  52. var serverAdapter = new TestMqttServerAdapter();
  53. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  54. var receivedMessagesCount = 0;
  55. try
  56. {
  57. await s.StartAsync(new MqttServerOptions());
  58. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  59. var c1 = await serverAdapter.ConnectTestClient("c1");
  60. var c2 = await serverAdapter.ConnectTestClient("c2", willMessage);
  61. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  62. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  63. await c2.DisconnectAsync();
  64. await Task.Delay(1000);
  65. await c1.DisconnectAsync();
  66. }
  67. finally
  68. {
  69. await s.StopAsync();
  70. }
  71. Assert.AreEqual(0, receivedMessagesCount);
  72. }
  73. [TestMethod]
  74. public async Task MqttServer_SubscribeUnsubscribe()
  75. {
  76. var serverAdapter = new TestMqttServerAdapter();
  77. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  78. var receivedMessagesCount = 0;
  79. try
  80. {
  81. await s.StartAsync(new MqttServerOptions());
  82. var c1 = await serverAdapter.ConnectTestClient("c1");
  83. var c2 = await serverAdapter.ConnectTestClient("c2");
  84. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  85. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  86. await c2.PublishAsync(message);
  87. await Task.Delay(1000);
  88. Assert.AreEqual(0, receivedMessagesCount);
  89. var subscribeEventCalled = false;
  90. s.ClientSubscribedTopic += (_, e) =>
  91. {
  92. subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1";
  93. };
  94. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  95. await Task.Delay(500);
  96. Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");
  97. await c2.PublishAsync(message);
  98. await Task.Delay(500);
  99. Assert.AreEqual(1, receivedMessagesCount);
  100. var unsubscribeEventCalled = false;
  101. s.ClientUnsubscribedTopic += (_, e) =>
  102. {
  103. unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1";
  104. };
  105. await c1.UnsubscribeAsync("a");
  106. await Task.Delay(500);
  107. Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");
  108. await c2.PublishAsync(message);
  109. await Task.Delay(1000);
  110. Assert.AreEqual(1, receivedMessagesCount);
  111. }
  112. finally
  113. {
  114. await s.StopAsync();
  115. }
  116. await Task.Delay(500);
  117. Assert.AreEqual(1, receivedMessagesCount);
  118. }
  119. [TestMethod]
  120. public async Task MqttServer_Publish()
  121. {
  122. var serverAdapter = new TestMqttServerAdapter();
  123. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  124. var receivedMessagesCount = 0;
  125. try
  126. {
  127. await s.StartAsync(new MqttServerOptions());
  128. var c1 = await serverAdapter.ConnectTestClient("c1");
  129. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  130. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  131. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  132. await s.PublishAsync(message);
  133. await Task.Delay(500);
  134. }
  135. finally
  136. {
  137. await s.StopAsync();
  138. }
  139. Assert.AreEqual(1, receivedMessagesCount);
  140. }
  141. [TestMethod]
  142. public async Task MqttServer_Publish_MultipleClients()
  143. {
  144. var s = new MqttFactory().CreateMqttServer();
  145. var receivedMessagesCount = 0;
  146. var locked = new object();
  147. var clientOptions = new MqttClientOptionsBuilder()
  148. .WithTcpServer("localhost")
  149. .Build();
  150. var clientOptions2 = new MqttClientOptionsBuilder()
  151. .WithTcpServer("localhost")
  152. .Build();
  153. try
  154. {
  155. await s.StartAsync(new MqttServerOptions());
  156. var c1 = new MqttFactory().CreateMqttClient();
  157. var c2 = new MqttFactory().CreateMqttClient();
  158. await c1.ConnectAsync(clientOptions);
  159. await c2.ConnectAsync(clientOptions2);
  160. c1.ApplicationMessageReceived += (_, __) =>
  161. {
  162. lock (locked)
  163. {
  164. receivedMessagesCount++;
  165. }
  166. };
  167. c2.ApplicationMessageReceived += (_, __) =>
  168. {
  169. lock (locked)
  170. {
  171. receivedMessagesCount++;
  172. }
  173. };
  174. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  175. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  176. await c2.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  177. //await Task.WhenAll(Publish(c1, message), Publish(c2, message));
  178. await Publish(c1, message);
  179. await Task.Delay(500);
  180. }
  181. finally
  182. {
  183. await s.StopAsync();
  184. }
  185. Assert.AreEqual(2000, receivedMessagesCount);
  186. }
  187. private static async Task Publish(IMqttClient c1, MqttApplicationMessage message)
  188. {
  189. for (int i = 0; i < 1000; i++)
  190. {
  191. await c1.PublishAsync(message);
  192. }
  193. }
  194. [TestMethod]
  195. public async Task MqttServer_ShutdownDisconnectsClientsGracefully()
  196. {
  197. var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
  198. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  199. var clientOptions = new MqttClientOptionsBuilder()
  200. .WithTcpServer("localhost")
  201. .Build();
  202. var disconnectCalled = 0;
  203. await s.StartAsync(new MqttServerOptions());
  204. var c1 = new MqttFactory().CreateMqttClient();
  205. c1.Disconnected += (sender, args) => disconnectCalled++;
  206. await c1.ConnectAsync(clientOptions);
  207. await Task.Delay(100);
  208. await s.StopAsync();
  209. await Task.Delay(100);
  210. Assert.AreEqual(1, disconnectCalled);
  211. }
  212. [TestMethod]
  213. public async Task MqttServer_HandleCleanDisconnect()
  214. {
  215. MqttNetGlobalLogger.LogMessagePublished += (_, e) =>
  216. {
  217. System.Diagnostics.Debug.WriteLine($"[{e.TraceMessage.Timestamp:s}] {e.TraceMessage.Source} {e.TraceMessage.Message}");
  218. };
  219. var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
  220. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  221. var clientConnectedCalled = 0;
  222. var clientDisconnectedCalled = 0;
  223. s.ClientConnected += (_, __) => clientConnectedCalled++;
  224. s.ClientDisconnected += (_, __) => clientDisconnectedCalled++;
  225. var clientOptions = new MqttClientOptionsBuilder()
  226. .WithTcpServer("localhost")
  227. .Build();
  228. await s.StartAsync(new MqttServerOptions());
  229. var c1 = new MqttFactory().CreateMqttClient();
  230. await c1.ConnectAsync(clientOptions);
  231. await Task.Delay(100);
  232. await c1.DisconnectAsync();
  233. await Task.Delay(100);
  234. await s.StopAsync();
  235. await Task.Delay(100);
  236. Assert.AreEqual(clientConnectedCalled, clientDisconnectedCalled);
  237. }
  238. [TestMethod]
  239. public async Task MqttServer_LotsOfRetainedMessages()
  240. {
  241. const int ClientCount = 100;
  242. var server = new MqttFactory().CreateMqttServer();
  243. try
  244. {
  245. await server.StartAsync(new MqttServerOptionsBuilder().Build());
  246. Parallel.For(
  247. 0,
  248. ClientCount,
  249. new ParallelOptions { MaxDegreeOfParallelism = 10 },
  250. i =>
  251. {
  252. using (var client = new MqttFactory().CreateMqttClient())
  253. {
  254. client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build())
  255. .GetAwaiter().GetResult();
  256. for (var j = 0; j < 10; j++)
  257. {
  258. // Clear retained message.
  259. client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i)
  260. .WithPayload(new byte[0]).WithRetainFlag().Build()).GetAwaiter().GetResult();
  261. // Set retained message.
  262. client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("r" + i)
  263. .WithPayload("value" + j).WithRetainFlag().Build()).GetAwaiter().GetResult();
  264. }
  265. client.DisconnectAsync().GetAwaiter().GetResult();
  266. }
  267. });
  268. await Task.Delay(100);
  269. var retainedMessages = server.GetRetainedMessages();
  270. Assert.AreEqual(ClientCount, retainedMessages.Count);
  271. for (var i = 0; i < ClientCount; i++)
  272. {
  273. Assert.IsTrue(retainedMessages.Any(m => m.Topic == "r" + i));
  274. }
  275. }
  276. finally
  277. {
  278. await server.StopAsync();
  279. }
  280. }
  281. [TestMethod]
  282. public async Task MqttServer_RetainedMessagesFlow()
  283. {
  284. var retainedMessage = new MqttApplicationMessageBuilder().WithTopic("r").WithPayload("r").WithRetainFlag().Build();
  285. var serverAdapter = new TestMqttServerAdapter();
  286. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  287. await s.StartAsync(new MqttServerOptions());
  288. var c1 = await serverAdapter.ConnectTestClient("c1");
  289. await c1.PublishAsync(retainedMessage);
  290. Thread.Sleep(500);
  291. await c1.DisconnectAsync();
  292. Thread.Sleep(500);
  293. var receivedMessages = 0;
  294. var c2 = await serverAdapter.ConnectTestClient("c2");
  295. c2.ApplicationMessageReceived += (_, e) =>
  296. {
  297. receivedMessages++;
  298. };
  299. for (var i = 0; i < 5; i++)
  300. {
  301. await c2.UnsubscribeAsync("r");
  302. await Task.Delay(500);
  303. Assert.AreEqual(i, receivedMessages);
  304. await c2.SubscribeAsync("r");
  305. await Task.Delay(500);
  306. Assert.AreEqual(i + 1, receivedMessages);
  307. }
  308. await c2.DisconnectAsync();
  309. }
  310. [TestMethod]
  311. public async Task MqttServer_NoRetainedMessage()
  312. {
  313. var serverAdapter = new TestMqttServerAdapter();
  314. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  315. var receivedMessagesCount = 0;
  316. try
  317. {
  318. await s.StartAsync(new MqttServerOptions());
  319. var c1 = await serverAdapter.ConnectTestClient("c1");
  320. await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[3]));
  321. await c1.DisconnectAsync();
  322. var c2 = await serverAdapter.ConnectTestClient("c2");
  323. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  324. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  325. await Task.Delay(500);
  326. }
  327. finally
  328. {
  329. await s.StopAsync();
  330. }
  331. Assert.AreEqual(0, receivedMessagesCount);
  332. }
  333. [TestMethod]
  334. public async Task MqttServer_RetainedMessage()
  335. {
  336. var serverAdapter = new TestMqttServerAdapter();
  337. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  338. var receivedMessagesCount = 0;
  339. try
  340. {
  341. await s.StartAsync(new MqttServerOptions());
  342. var c1 = await serverAdapter.ConnectTestClient("c1");
  343. await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  344. await c1.DisconnectAsync();
  345. var c2 = await serverAdapter.ConnectTestClient("c2");
  346. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  347. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  348. await Task.Delay(500);
  349. }
  350. finally
  351. {
  352. await s.StopAsync();
  353. }
  354. Assert.AreEqual(1, receivedMessagesCount);
  355. }
  356. [TestMethod]
  357. public async Task MqttServer_ClearRetainedMessage()
  358. {
  359. var serverAdapter = new TestMqttServerAdapter();
  360. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  361. var receivedMessagesCount = 0;
  362. try
  363. {
  364. await s.StartAsync(new MqttServerOptions());
  365. var c1 = await serverAdapter.ConnectTestClient("c1");
  366. await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag());
  367. await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag());
  368. await c1.DisconnectAsync();
  369. var c2 = await serverAdapter.ConnectTestClient("c2");
  370. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  371. await Task.Delay(200);
  372. await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
  373. await Task.Delay(500);
  374. }
  375. finally
  376. {
  377. await s.StopAsync();
  378. }
  379. Assert.AreEqual(0, receivedMessagesCount);
  380. }
  381. [TestMethod]
  382. public async Task MqttServer_PersistRetainedMessage()
  383. {
  384. var storage = new TestStorage();
  385. var serverAdapter = new TestMqttServerAdapter();
  386. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  387. try
  388. {
  389. var options = new MqttServerOptions { Storage = storage };
  390. await s.StartAsync(options);
  391. var c1 = await serverAdapter.ConnectTestClient("c1");
  392. await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  393. await Task.Delay(250);
  394. await c1.DisconnectAsync();
  395. }
  396. finally
  397. {
  398. await s.StopAsync();
  399. }
  400. Assert.AreEqual(1, storage.Messages.Count);
  401. s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  402. var receivedMessagesCount = 0;
  403. try
  404. {
  405. var options = new MqttServerOptions { Storage = storage };
  406. await s.StartAsync(options);
  407. var c2 = await serverAdapter.ConnectTestClient("c2");
  408. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  409. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  410. await Task.Delay(250);
  411. }
  412. finally
  413. {
  414. await s.StopAsync();
  415. }
  416. Assert.AreEqual(1, receivedMessagesCount);
  417. }
  418. [TestMethod]
  419. public async Task MqttServer_InterceptMessage()
  420. {
  421. void Interceptor(MqttApplicationMessageInterceptorContext context)
  422. {
  423. context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
  424. }
  425. var serverAdapter = new TestMqttServerAdapter();
  426. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  427. try
  428. {
  429. var options = new MqttServerOptions { ApplicationMessageInterceptor = Interceptor };
  430. await s.StartAsync(options);
  431. var c1 = await serverAdapter.ConnectTestClient("c1");
  432. var c2 = await serverAdapter.ConnectTestClient("c2");
  433. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build());
  434. var isIntercepted = false;
  435. c2.ApplicationMessageReceived += (sender, args) =>
  436. {
  437. isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
  438. };
  439. await c1.PublishAsync(builder => builder.WithTopic("test"));
  440. await c1.DisconnectAsync();
  441. await Task.Delay(500);
  442. Assert.IsTrue(isIntercepted);
  443. }
  444. finally
  445. {
  446. await s.StopAsync();
  447. }
  448. }
  449. [TestMethod]
  450. public async Task MqttServer_Body()
  451. {
  452. var serverAdapter = new TestMqttServerAdapter();
  453. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  454. var bodyIsMatching = false;
  455. try
  456. {
  457. await s.StartAsync(new MqttServerOptions());
  458. var c1 = await serverAdapter.ConnectTestClient("c1");
  459. var c2 = await serverAdapter.ConnectTestClient("c2");
  460. c1.ApplicationMessageReceived += (_, e) =>
  461. {
  462. if (Encoding.UTF8.GetString(e.ApplicationMessage.Payload) == "The body")
  463. {
  464. bodyIsMatching = true;
  465. }
  466. };
  467. await c1.SubscribeAsync("A", MqttQualityOfServiceLevel.AtMostOnce);
  468. await c2.PublishAsync(builder => builder.WithTopic("A").WithPayload(Encoding.UTF8.GetBytes("The body")));
  469. await Task.Delay(1000);
  470. }
  471. finally
  472. {
  473. await s.StopAsync();
  474. }
  475. Assert.IsTrue(bodyIsMatching);
  476. }
  477. [TestMethod]
  478. public async Task MqttServer_ConnectionDenied()
  479. {
  480. var server = new MqttFactory().CreateMqttServer();
  481. var client = new MqttFactory().CreateMqttClient();
  482. try
  483. {
  484. var options = new MqttServerOptionsBuilder().WithConnectionValidator(context =>
  485. {
  486. context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized;
  487. }).Build();
  488. await server.StartAsync(options);
  489. var clientOptions = new MqttClientOptionsBuilder()
  490. .WithTcpServer("localhost").Build();
  491. try
  492. {
  493. await client.ConnectAsync(clientOptions);
  494. Assert.Fail("An exception should be raised.");
  495. }
  496. catch (Exception exception)
  497. {
  498. if (exception is MqttConnectingFailedException)
  499. {
  500. }
  501. else
  502. {
  503. Assert.Fail("Wrong exception.");
  504. }
  505. }
  506. }
  507. finally
  508. {
  509. await client.DisconnectAsync();
  510. await server.StopAsync();
  511. client.Dispose();
  512. }
  513. }
  514. [TestMethod]
  515. public async Task MqttServer_SameClientIdConnectDisconnectEventOrder()
  516. {
  517. var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
  518. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  519. var connectedClient = false;
  520. var connecteCalledBeforeConnectedClients = false;
  521. s.ClientConnected += (_, __) =>
  522. {
  523. connecteCalledBeforeConnectedClients |= connectedClient;
  524. connectedClient = true;
  525. };
  526. s.ClientDisconnected += (_, __) =>
  527. {
  528. connectedClient = false;
  529. };
  530. var clientOptions = new MqttClientOptionsBuilder()
  531. .WithTcpServer("localhost")
  532. .WithClientId(Guid.NewGuid().ToString())
  533. .Build();
  534. await s.StartAsync(new MqttServerOptions());
  535. var c1 = new MqttFactory().CreateMqttClient();
  536. var c2 = new MqttFactory().CreateMqttClient();
  537. await c1.ConnectAsync(clientOptions);
  538. await Task.Delay(100);
  539. await c2.ConnectAsync(clientOptions);
  540. await Task.Delay(100);
  541. await c1.DisconnectAsync();
  542. await c2.DisconnectAsync();
  543. await s.StopAsync();
  544. await Task.Delay(100);
  545. Assert.IsFalse(connecteCalledBeforeConnectedClients, "ClientConnected was called before ClientDisconnect was called");
  546. }
  547. [TestMethod]
  548. public async Task MqttServer_StopAndRestart()
  549. {
  550. var server = new MqttFactory().CreateMqttServer();
  551. await server.StartAsync(new MqttServerOptions());
  552. var client = new MqttFactory().CreateMqttClient();
  553. await client.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build());
  554. await server.StopAsync();
  555. try
  556. {
  557. var client2 = new MqttFactory().CreateMqttClient();
  558. await client2.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build());
  559. Assert.Fail("Connecting should fail.");
  560. }
  561. catch (Exception)
  562. {
  563. }
  564. await server.StartAsync(new MqttServerOptions());
  565. var client3 = new MqttFactory().CreateMqttClient();
  566. await client3.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build());
  567. }
  568. private class TestStorage : IMqttServerStorage
  569. {
  570. public IList<MqttApplicationMessage> Messages = new List<MqttApplicationMessage>();
  571. public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
  572. {
  573. Messages = messages;
  574. return Task.CompletedTask;
  575. }
  576. public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
  577. {
  578. return Task.FromResult(Messages);
  579. }
  580. }
  581. private static async Task TestPublishAsync(
  582. string topic,
  583. MqttQualityOfServiceLevel qualityOfServiceLevel,
  584. string topicFilter,
  585. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  586. int expectedReceivedMessagesCount)
  587. {
  588. var s = new MqttFactory().CreateMqttServer();
  589. var receivedMessagesCount = 0;
  590. try
  591. {
  592. await s.StartAsync(new MqttServerOptions());
  593. var c1 = new MqttFactory().CreateMqttClient();
  594. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  595. await c1.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build());
  596. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());
  597. var c2 = new MqttFactory().CreateMqttClient();
  598. await c2.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build());
  599. await c2.PublishAsync(builder => builder.WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel));
  600. await Task.Delay(500);
  601. await c1.UnsubscribeAsync(topicFilter);
  602. await Task.Delay(500);
  603. }
  604. finally
  605. {
  606. await s.StopAsync();
  607. }
  608. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  609. }
  610. }
  611. }