You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

470 rivejä
16 KiB

  1. using Microsoft.VisualStudio.TestTools.UnitTesting;
  2. using MQTTnet.Client;
  3. using MQTTnet.Diagnostics;
  4. using MQTTnet.Protocol;
  5. using MQTTnet.Server;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Text;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. namespace MQTTnet.Core.Tests
  12. {
  13. [TestClass]
  14. public class MqttServerTests
  15. {
  16. [TestMethod]
  17. public void MqttServer_PublishSimple_AtMostOnce()
  18. {
  19. TestPublishAsync(
  20. "A/B/C",
  21. MqttQualityOfServiceLevel.AtMostOnce,
  22. "A/B/C",
  23. MqttQualityOfServiceLevel.AtMostOnce,
  24. 1).Wait();
  25. }
  26. [TestMethod]
  27. public void MqttServer_PublishSimple_AtLeastOnce()
  28. {
  29. TestPublishAsync(
  30. "A/B/C",
  31. MqttQualityOfServiceLevel.AtLeastOnce,
  32. "A/B/C",
  33. MqttQualityOfServiceLevel.AtLeastOnce,
  34. 1).Wait();
  35. }
  36. [TestMethod]
  37. public void MqttServer_PublishSimple_ExactlyOnce()
  38. {
  39. TestPublishAsync(
  40. "A/B/C",
  41. MqttQualityOfServiceLevel.ExactlyOnce,
  42. "A/B/C",
  43. MqttQualityOfServiceLevel.ExactlyOnce,
  44. 1).Wait();
  45. }
  46. [TestMethod]
  47. public async Task MqttServer_WillMessage()
  48. {
  49. var serverAdapter = new TestMqttServerAdapter();
  50. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  51. var receivedMessagesCount = 0;
  52. try
  53. {
  54. await s.StartAsync(new MqttServerOptions());
  55. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  56. var c1 = await serverAdapter.ConnectTestClient("c1");
  57. var c2 = await serverAdapter.ConnectTestClient("c2", willMessage);
  58. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  59. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  60. await c2.DisconnectAsync();
  61. await Task.Delay(1000);
  62. await c1.DisconnectAsync();
  63. }
  64. finally
  65. {
  66. await s.StopAsync();
  67. }
  68. Assert.AreEqual(0, receivedMessagesCount);
  69. }
  70. [TestMethod]
  71. public async Task MqttServer_SubscribeUnsubscribe()
  72. {
  73. var serverAdapter = new TestMqttServerAdapter();
  74. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  75. var receivedMessagesCount = 0;
  76. try
  77. {
  78. await s.StartAsync(new MqttServerOptions());
  79. var c1 = await serverAdapter.ConnectTestClient("c1");
  80. var c2 = await serverAdapter.ConnectTestClient("c2");
  81. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  82. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  83. await c2.PublishAsync(message);
  84. await Task.Delay(1000);
  85. Assert.AreEqual(0, receivedMessagesCount);
  86. var subscribeEventCalled = false;
  87. s.ClientSubscribedTopic += (_, e) =>
  88. {
  89. subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1";
  90. };
  91. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  92. await Task.Delay(500);
  93. Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");
  94. await c2.PublishAsync(message);
  95. await Task.Delay(500);
  96. Assert.AreEqual(1, receivedMessagesCount);
  97. var unsubscribeEventCalled = false;
  98. s.ClientUnsubscribedTopic += (_, e) =>
  99. {
  100. unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1";
  101. };
  102. await c1.UnsubscribeAsync("a");
  103. await Task.Delay(500);
  104. Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");
  105. await c2.PublishAsync(message);
  106. await Task.Delay(1000);
  107. Assert.AreEqual(1, receivedMessagesCount);
  108. }
  109. finally
  110. {
  111. await s.StopAsync();
  112. }
  113. await Task.Delay(500);
  114. Assert.AreEqual(1, receivedMessagesCount);
  115. }
  116. [TestMethod]
  117. public async Task MqttServer_Publish()
  118. {
  119. var serverAdapter = new TestMqttServerAdapter();
  120. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  121. var receivedMessagesCount = 0;
  122. try
  123. {
  124. await s.StartAsync(new MqttServerOptions());
  125. var c1 = await serverAdapter.ConnectTestClient("c1");
  126. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  127. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  128. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  129. await s.PublishAsync(message);
  130. await Task.Delay(500);
  131. }
  132. finally
  133. {
  134. await s.StopAsync();
  135. }
  136. Assert.AreEqual(1, receivedMessagesCount);
  137. }
  138. [TestMethod]
  139. public async Task MqttServer_RetainedMessagesFlow()
  140. {
  141. var retainedMessage = new MqttApplicationMessageBuilder().WithTopic("r").WithPayload("r").WithRetainFlag().Build();
  142. var serverAdapter = new TestMqttServerAdapter();
  143. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  144. await s.StartAsync(new MqttServerOptions());
  145. var c1 = await serverAdapter.ConnectTestClient("c1");
  146. await c1.PublishAsync(retainedMessage);
  147. Thread.Sleep(500);
  148. await c1.DisconnectAsync();
  149. Thread.Sleep(500);
  150. var receivedMessages = 0;
  151. var c2 = await serverAdapter.ConnectTestClient("c2");
  152. c2.ApplicationMessageReceived += (_, e) =>
  153. {
  154. receivedMessages++;
  155. };
  156. for (var i = 0; i < 5; i++)
  157. {
  158. await c2.UnsubscribeAsync("r");
  159. await Task.Delay(500);
  160. Assert.AreEqual(i, receivedMessages);
  161. await c2.SubscribeAsync("r");
  162. await Task.Delay(500);
  163. Assert.AreEqual(i + 1, receivedMessages);
  164. }
  165. await c2.DisconnectAsync();
  166. }
  167. [TestMethod]
  168. public async Task MqttServer_NoRetainedMessage()
  169. {
  170. var serverAdapter = new TestMqttServerAdapter();
  171. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  172. var receivedMessagesCount = 0;
  173. try
  174. {
  175. await s.StartAsync(new MqttServerOptions());
  176. var c1 = await serverAdapter.ConnectTestClient("c1");
  177. await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[3]));
  178. await c1.DisconnectAsync();
  179. var c2 = await serverAdapter.ConnectTestClient("c2");
  180. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  181. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  182. await Task.Delay(500);
  183. }
  184. finally
  185. {
  186. await s.StopAsync();
  187. }
  188. Assert.AreEqual(0, receivedMessagesCount);
  189. }
  190. [TestMethod]
  191. public async Task MqttServer_RetainedMessage()
  192. {
  193. var serverAdapter = new TestMqttServerAdapter();
  194. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  195. var receivedMessagesCount = 0;
  196. try
  197. {
  198. await s.StartAsync(new MqttServerOptions());
  199. var c1 = await serverAdapter.ConnectTestClient("c1");
  200. await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  201. await c1.DisconnectAsync();
  202. var c2 = await serverAdapter.ConnectTestClient("c2");
  203. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  204. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  205. await Task.Delay(500);
  206. }
  207. finally
  208. {
  209. await s.StopAsync();
  210. }
  211. Assert.AreEqual(1, receivedMessagesCount);
  212. }
  213. [TestMethod]
  214. public async Task MqttServer_ClearRetainedMessage()
  215. {
  216. var serverAdapter = new TestMqttServerAdapter();
  217. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  218. var receivedMessagesCount = 0;
  219. try
  220. {
  221. await s.StartAsync(new MqttServerOptions());
  222. var c1 = await serverAdapter.ConnectTestClient("c1");
  223. await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag());
  224. await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag());
  225. await c1.DisconnectAsync();
  226. var c2 = await serverAdapter.ConnectTestClient("c2");
  227. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  228. await Task.Delay(200);
  229. await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
  230. await Task.Delay(500);
  231. }
  232. finally
  233. {
  234. await s.StopAsync();
  235. }
  236. Assert.AreEqual(0, receivedMessagesCount);
  237. }
  238. [TestMethod]
  239. public async Task MqttServer_PersistRetainedMessage()
  240. {
  241. var storage = new TestStorage();
  242. var serverAdapter = new TestMqttServerAdapter();
  243. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  244. try
  245. {
  246. var options = new MqttServerOptions { Storage = storage };
  247. await s.StartAsync(options);
  248. var c1 = await serverAdapter.ConnectTestClient("c1");
  249. await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  250. await c1.DisconnectAsync();
  251. }
  252. finally
  253. {
  254. await s.StopAsync();
  255. }
  256. Assert.AreEqual(1, storage.Messages.Count);
  257. s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  258. var receivedMessagesCount = 0;
  259. try
  260. {
  261. var options = new MqttServerOptions { Storage = storage };
  262. await s.StartAsync(options);
  263. var c2 = await serverAdapter.ConnectTestClient("c2");
  264. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  265. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  266. await Task.Delay(500);
  267. }
  268. finally
  269. {
  270. await s.StopAsync();
  271. }
  272. Assert.AreEqual(1, receivedMessagesCount);
  273. }
  274. [TestMethod]
  275. public async Task MqttServer_InterceptMessage()
  276. {
  277. void Interceptor(MqttApplicationMessageInterceptorContext context)
  278. {
  279. context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
  280. }
  281. var serverAdapter = new TestMqttServerAdapter();
  282. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  283. try
  284. {
  285. var options = new MqttServerOptions { ApplicationMessageInterceptor = Interceptor };
  286. await s.StartAsync(options);
  287. var c1 = await serverAdapter.ConnectTestClient("c1");
  288. var c2 = await serverAdapter.ConnectTestClient("c2");
  289. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build());
  290. var isIntercepted = false;
  291. c2.ApplicationMessageReceived += (sender, args) =>
  292. {
  293. isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
  294. };
  295. await c1.PublishAsync(builder => builder.WithTopic("test"));
  296. await c1.DisconnectAsync();
  297. await Task.Delay(500);
  298. Assert.IsTrue(isIntercepted);
  299. }
  300. finally
  301. {
  302. await s.StopAsync();
  303. }
  304. }
  305. [TestMethod]
  306. public async Task MqttServer_Body()
  307. {
  308. var serverAdapter = new TestMqttServerAdapter();
  309. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  310. var bodyIsMatching = false;
  311. try
  312. {
  313. await s.StartAsync(new MqttServerOptions());
  314. var c1 = await serverAdapter.ConnectTestClient("c1");
  315. var c2 = await serverAdapter.ConnectTestClient("c2");
  316. c1.ApplicationMessageReceived += (_, e) =>
  317. {
  318. if (Encoding.UTF8.GetString(e.ApplicationMessage.Payload) == "The body")
  319. {
  320. bodyIsMatching = true;
  321. }
  322. };
  323. await c1.SubscribeAsync("A", MqttQualityOfServiceLevel.AtMostOnce);
  324. await c2.PublishAsync(builder => builder.WithTopic("A").WithPayload(Encoding.UTF8.GetBytes("The body")));
  325. await Task.Delay(1000);
  326. }
  327. finally
  328. {
  329. await s.StopAsync();
  330. }
  331. Assert.IsTrue(bodyIsMatching);
  332. }
  333. private class TestStorage : IMqttServerStorage
  334. {
  335. public IList<MqttApplicationMessage> Messages = new List<MqttApplicationMessage>();
  336. public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
  337. {
  338. Messages = messages;
  339. return Task.CompletedTask;
  340. }
  341. public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
  342. {
  343. return Task.FromResult(Messages);
  344. }
  345. }
  346. private static async Task TestPublishAsync(
  347. string topic,
  348. MqttQualityOfServiceLevel qualityOfServiceLevel,
  349. string topicFilter,
  350. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  351. int expectedReceivedMessagesCount)
  352. {
  353. var serverAdapter = new TestMqttServerAdapter();
  354. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  355. var receivedMessagesCount = 0;
  356. try
  357. {
  358. await s.StartAsync(new MqttServerOptions());
  359. var c1 = await serverAdapter.ConnectTestClient("c1");
  360. var c2 = await serverAdapter.ConnectTestClient("c2");
  361. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  362. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());
  363. await c2.PublishAsync(builder => builder.WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel));
  364. await Task.Delay(500);
  365. await c1.UnsubscribeAsync(topicFilter);
  366. await Task.Delay(500);
  367. }
  368. finally
  369. {
  370. await s.StopAsync();
  371. }
  372. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  373. }
  374. }
  375. }