You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

568 lines
20 KiB

  1. using Microsoft.VisualStudio.TestTools.UnitTesting;
  2. using MQTTnet.Client;
  3. using MQTTnet.Diagnostics;
  4. using MQTTnet.Protocol;
  5. using MQTTnet.Server;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Text;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. using MQTTnet.Implementations;
  12. namespace MQTTnet.Core.Tests
  13. {
  14. [TestClass]
  15. public class MqttServerTests
  16. {
  17. [TestMethod]
  18. public void MqttServer_PublishSimple_AtMostOnce()
  19. {
  20. TestPublishAsync(
  21. "A/B/C",
  22. MqttQualityOfServiceLevel.AtMostOnce,
  23. "A/B/C",
  24. MqttQualityOfServiceLevel.AtMostOnce,
  25. 1).Wait();
  26. }
  27. [TestMethod]
  28. public void MqttServer_PublishSimple_AtLeastOnce()
  29. {
  30. TestPublishAsync(
  31. "A/B/C",
  32. MqttQualityOfServiceLevel.AtLeastOnce,
  33. "A/B/C",
  34. MqttQualityOfServiceLevel.AtLeastOnce,
  35. 1).Wait();
  36. }
  37. [TestMethod]
  38. public void MqttServer_PublishSimple_ExactlyOnce()
  39. {
  40. TestPublishAsync(
  41. "A/B/C",
  42. MqttQualityOfServiceLevel.ExactlyOnce,
  43. "A/B/C",
  44. MqttQualityOfServiceLevel.ExactlyOnce,
  45. 1).Wait();
  46. }
  47. [TestMethod]
  48. public async Task MqttServer_WillMessage()
  49. {
  50. var serverAdapter = new TestMqttServerAdapter();
  51. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  52. var receivedMessagesCount = 0;
  53. try
  54. {
  55. await s.StartAsync(new MqttServerOptions());
  56. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  57. var c1 = await serverAdapter.ConnectTestClient("c1");
  58. var c2 = await serverAdapter.ConnectTestClient("c2", willMessage);
  59. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  60. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  61. await c2.DisconnectAsync();
  62. await Task.Delay(1000);
  63. await c1.DisconnectAsync();
  64. }
  65. finally
  66. {
  67. await s.StopAsync();
  68. }
  69. Assert.AreEqual(0, receivedMessagesCount);
  70. }
  71. [TestMethod]
  72. public async Task MqttServer_SubscribeUnsubscribe()
  73. {
  74. var serverAdapter = new TestMqttServerAdapter();
  75. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  76. var receivedMessagesCount = 0;
  77. try
  78. {
  79. await s.StartAsync(new MqttServerOptions());
  80. var c1 = await serverAdapter.ConnectTestClient("c1");
  81. var c2 = await serverAdapter.ConnectTestClient("c2");
  82. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  83. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  84. await c2.PublishAsync(message);
  85. await Task.Delay(1000);
  86. Assert.AreEqual(0, receivedMessagesCount);
  87. var subscribeEventCalled = false;
  88. s.ClientSubscribedTopic += (_, e) =>
  89. {
  90. subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1";
  91. };
  92. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  93. await Task.Delay(500);
  94. Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");
  95. await c2.PublishAsync(message);
  96. await Task.Delay(500);
  97. Assert.AreEqual(1, receivedMessagesCount);
  98. var unsubscribeEventCalled = false;
  99. s.ClientUnsubscribedTopic += (_, e) =>
  100. {
  101. unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1";
  102. };
  103. await c1.UnsubscribeAsync("a");
  104. await Task.Delay(500);
  105. Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");
  106. await c2.PublishAsync(message);
  107. await Task.Delay(1000);
  108. Assert.AreEqual(1, receivedMessagesCount);
  109. }
  110. finally
  111. {
  112. await s.StopAsync();
  113. }
  114. await Task.Delay(500);
  115. Assert.AreEqual(1, receivedMessagesCount);
  116. }
  117. [TestMethod]
  118. public async Task MqttServer_Publish()
  119. {
  120. var serverAdapter = new TestMqttServerAdapter();
  121. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  122. var receivedMessagesCount = 0;
  123. try
  124. {
  125. await s.StartAsync(new MqttServerOptions());
  126. var c1 = await serverAdapter.ConnectTestClient("c1");
  127. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  128. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  129. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  130. await s.PublishAsync(message);
  131. await Task.Delay(500);
  132. }
  133. finally
  134. {
  135. await s.StopAsync();
  136. }
  137. Assert.AreEqual(1, receivedMessagesCount);
  138. }
  139. [TestMethod]
  140. public async Task MqttServer_Publish_MultipleClients()
  141. {
  142. var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
  143. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  144. var receivedMessagesCount = 0;
  145. var locked = new object();
  146. var clientOptions = new MqttClientOptionsBuilder()
  147. .WithTcpServer("localhost")
  148. .Build();
  149. var clientOptions2 = new MqttClientOptionsBuilder()
  150. .WithTcpServer("localhost")
  151. .Build();
  152. try
  153. {
  154. await s.StartAsync(new MqttServerOptions());
  155. var c1 = new MqttFactory().CreateMqttClient();
  156. var c2 = new MqttFactory().CreateMqttClient();
  157. await c1.ConnectAsync(clientOptions);
  158. await c2.ConnectAsync(clientOptions2);
  159. c1.ApplicationMessageReceived += (_, __) =>
  160. {
  161. lock (locked)
  162. {
  163. receivedMessagesCount++;
  164. }
  165. };
  166. c2.ApplicationMessageReceived += (_, __) =>
  167. {
  168. lock (locked)
  169. {
  170. receivedMessagesCount++;
  171. }
  172. };
  173. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  174. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  175. await c2.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  176. //await Task.WhenAll(Publish(c1, message), Publish(c2, message));
  177. await Publish(c1, message);
  178. await Task.Delay(500);
  179. }
  180. finally
  181. {
  182. await s.StopAsync();
  183. }
  184. Assert.AreEqual(2000, receivedMessagesCount);
  185. }
  186. private static async Task Publish(IMqttClient c1, MqttApplicationMessage message)
  187. {
  188. for (int i = 0; i < 1000; i++)
  189. {
  190. await c1.PublishAsync(message);
  191. }
  192. }
  193. [TestMethod]
  194. public async Task MqttServer_ShutdownDisconnectsClientsGracefully()
  195. {
  196. var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
  197. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  198. var clientOptions = new MqttClientOptionsBuilder()
  199. .WithTcpServer("localhost")
  200. .Build();
  201. bool disconnectCalled = false;
  202. await s.StartAsync(new MqttServerOptions());
  203. var c1 = new MqttFactory().CreateMqttClient();
  204. c1.Disconnected += (sender, args) => disconnectCalled = true;
  205. await c1.ConnectAsync(clientOptions);
  206. await Task.Delay(100);
  207. await s.StopAsync();
  208. await Task.Delay(100);
  209. Assert.IsTrue(disconnectCalled);
  210. }
  211. [TestMethod]
  212. public async Task MqttServer_RetainedMessagesFlow()
  213. {
  214. var retainedMessage = new MqttApplicationMessageBuilder().WithTopic("r").WithPayload("r").WithRetainFlag().Build();
  215. var serverAdapter = new TestMqttServerAdapter();
  216. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  217. await s.StartAsync(new MqttServerOptions());
  218. var c1 = await serverAdapter.ConnectTestClient("c1");
  219. await c1.PublishAsync(retainedMessage);
  220. Thread.Sleep(500);
  221. await c1.DisconnectAsync();
  222. Thread.Sleep(500);
  223. var receivedMessages = 0;
  224. var c2 = await serverAdapter.ConnectTestClient("c2");
  225. c2.ApplicationMessageReceived += (_, e) =>
  226. {
  227. receivedMessages++;
  228. };
  229. for (var i = 0; i < 5; i++)
  230. {
  231. await c2.UnsubscribeAsync("r");
  232. await Task.Delay(500);
  233. Assert.AreEqual(i, receivedMessages);
  234. await c2.SubscribeAsync("r");
  235. await Task.Delay(500);
  236. Assert.AreEqual(i + 1, receivedMessages);
  237. }
  238. await c2.DisconnectAsync();
  239. }
  240. [TestMethod]
  241. public async Task MqttServer_NoRetainedMessage()
  242. {
  243. var serverAdapter = new TestMqttServerAdapter();
  244. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  245. var receivedMessagesCount = 0;
  246. try
  247. {
  248. await s.StartAsync(new MqttServerOptions());
  249. var c1 = await serverAdapter.ConnectTestClient("c1");
  250. await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[3]));
  251. await c1.DisconnectAsync();
  252. var c2 = await serverAdapter.ConnectTestClient("c2");
  253. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  254. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  255. await Task.Delay(500);
  256. }
  257. finally
  258. {
  259. await s.StopAsync();
  260. }
  261. Assert.AreEqual(0, receivedMessagesCount);
  262. }
  263. [TestMethod]
  264. public async Task MqttServer_RetainedMessage()
  265. {
  266. var serverAdapter = new TestMqttServerAdapter();
  267. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  268. var receivedMessagesCount = 0;
  269. try
  270. {
  271. await s.StartAsync(new MqttServerOptions());
  272. var c1 = await serverAdapter.ConnectTestClient("c1");
  273. await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  274. await c1.DisconnectAsync();
  275. var c2 = await serverAdapter.ConnectTestClient("c2");
  276. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  277. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  278. await Task.Delay(500);
  279. }
  280. finally
  281. {
  282. await s.StopAsync();
  283. }
  284. Assert.AreEqual(1, receivedMessagesCount);
  285. }
  286. [TestMethod]
  287. public async Task MqttServer_ClearRetainedMessage()
  288. {
  289. var serverAdapter = new TestMqttServerAdapter();
  290. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  291. var receivedMessagesCount = 0;
  292. try
  293. {
  294. await s.StartAsync(new MqttServerOptions());
  295. var c1 = await serverAdapter.ConnectTestClient("c1");
  296. await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag());
  297. await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag());
  298. await c1.DisconnectAsync();
  299. var c2 = await serverAdapter.ConnectTestClient("c2");
  300. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  301. await Task.Delay(200);
  302. await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
  303. await Task.Delay(500);
  304. }
  305. finally
  306. {
  307. await s.StopAsync();
  308. }
  309. Assert.AreEqual(0, receivedMessagesCount);
  310. }
  311. [TestMethod]
  312. public async Task MqttServer_PersistRetainedMessage()
  313. {
  314. var storage = new TestStorage();
  315. var serverAdapter = new TestMqttServerAdapter();
  316. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  317. try
  318. {
  319. var options = new MqttServerOptions { Storage = storage };
  320. await s.StartAsync(options);
  321. var c1 = await serverAdapter.ConnectTestClient("c1");
  322. await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  323. await Task.Delay(250);
  324. await c1.DisconnectAsync();
  325. }
  326. finally
  327. {
  328. await s.StopAsync();
  329. }
  330. Assert.AreEqual(1, storage.Messages.Count);
  331. s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  332. var receivedMessagesCount = 0;
  333. try
  334. {
  335. var options = new MqttServerOptions { Storage = storage };
  336. await s.StartAsync(options);
  337. var c2 = await serverAdapter.ConnectTestClient("c2");
  338. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  339. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  340. await Task.Delay(250);
  341. }
  342. finally
  343. {
  344. await s.StopAsync();
  345. }
  346. Assert.AreEqual(1, receivedMessagesCount);
  347. }
  348. [TestMethod]
  349. public async Task MqttServer_InterceptMessage()
  350. {
  351. void Interceptor(MqttApplicationMessageInterceptorContext context)
  352. {
  353. context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
  354. }
  355. var serverAdapter = new TestMqttServerAdapter();
  356. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  357. try
  358. {
  359. var options = new MqttServerOptions { ApplicationMessageInterceptor = Interceptor };
  360. await s.StartAsync(options);
  361. var c1 = await serverAdapter.ConnectTestClient("c1");
  362. var c2 = await serverAdapter.ConnectTestClient("c2");
  363. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build());
  364. var isIntercepted = false;
  365. c2.ApplicationMessageReceived += (sender, args) =>
  366. {
  367. isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
  368. };
  369. await c1.PublishAsync(builder => builder.WithTopic("test"));
  370. await c1.DisconnectAsync();
  371. await Task.Delay(500);
  372. Assert.IsTrue(isIntercepted);
  373. }
  374. finally
  375. {
  376. await s.StopAsync();
  377. }
  378. }
  379. [TestMethod]
  380. public async Task MqttServer_Body()
  381. {
  382. var serverAdapter = new TestMqttServerAdapter();
  383. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  384. var bodyIsMatching = false;
  385. try
  386. {
  387. await s.StartAsync(new MqttServerOptions());
  388. var c1 = await serverAdapter.ConnectTestClient("c1");
  389. var c2 = await serverAdapter.ConnectTestClient("c2");
  390. c1.ApplicationMessageReceived += (_, e) =>
  391. {
  392. if (Encoding.UTF8.GetString(e.ApplicationMessage.Payload) == "The body")
  393. {
  394. bodyIsMatching = true;
  395. }
  396. };
  397. await c1.SubscribeAsync("A", MqttQualityOfServiceLevel.AtMostOnce);
  398. await c2.PublishAsync(builder => builder.WithTopic("A").WithPayload(Encoding.UTF8.GetBytes("The body")));
  399. await Task.Delay(1000);
  400. }
  401. finally
  402. {
  403. await s.StopAsync();
  404. }
  405. Assert.IsTrue(bodyIsMatching);
  406. }
  407. private class TestStorage : IMqttServerStorage
  408. {
  409. public IList<MqttApplicationMessage> Messages = new List<MqttApplicationMessage>();
  410. public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
  411. {
  412. Messages = messages;
  413. return Task.CompletedTask;
  414. }
  415. public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
  416. {
  417. return Task.FromResult(Messages);
  418. }
  419. }
  420. private static async Task TestPublishAsync(
  421. string topic,
  422. MqttQualityOfServiceLevel qualityOfServiceLevel,
  423. string topicFilter,
  424. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  425. int expectedReceivedMessagesCount)
  426. {
  427. var serverAdapter = new TestMqttServerAdapter();
  428. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  429. var receivedMessagesCount = 0;
  430. try
  431. {
  432. await s.StartAsync(new MqttServerOptions());
  433. var c1 = await serverAdapter.ConnectTestClient("c1");
  434. var c2 = await serverAdapter.ConnectTestClient("c2");
  435. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  436. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());
  437. await c2.PublishAsync(builder => builder.WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel));
  438. await Task.Delay(500);
  439. await c1.UnsubscribeAsync(topicFilter);
  440. await Task.Delay(500);
  441. }
  442. finally
  443. {
  444. await s.StopAsync();
  445. }
  446. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  447. }
  448. }
  449. }