You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

439 lines
15 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using System.Threading.Tasks;
  5. using Microsoft.VisualStudio.TestTools.UnitTesting;
  6. using MQTTnet.Diagnostics;
  7. using MQTTnet.Protocol;
  8. using MQTTnet.Server;
  9. using MQTTnet.Client;
  10. namespace MQTTnet.Core.Tests
  11. {
  12. [TestClass]
  13. public class MqttServerTests
  14. {
  15. [TestMethod]
  16. public void MqttServer_PublishSimple_AtMostOnce()
  17. {
  18. TestPublishAsync(
  19. "A/B/C",
  20. MqttQualityOfServiceLevel.AtMostOnce,
  21. "A/B/C",
  22. MqttQualityOfServiceLevel.AtMostOnce,
  23. 1).Wait();
  24. }
  25. [TestMethod]
  26. public void MqttServer_PublishSimple_AtLeastOnce()
  27. {
  28. TestPublishAsync(
  29. "A/B/C",
  30. MqttQualityOfServiceLevel.AtLeastOnce,
  31. "A/B/C",
  32. MqttQualityOfServiceLevel.AtLeastOnce,
  33. 1).Wait();
  34. }
  35. [TestMethod]
  36. public void MqttServer_PublishSimple_ExactlyOnce()
  37. {
  38. TestPublishAsync(
  39. "A/B/C",
  40. MqttQualityOfServiceLevel.ExactlyOnce,
  41. "A/B/C",
  42. MqttQualityOfServiceLevel.ExactlyOnce,
  43. 1).Wait();
  44. }
  45. [TestMethod]
  46. public async Task MqttServer_WillMessage()
  47. {
  48. var serverAdapter = new TestMqttServerAdapter();
  49. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  50. var receivedMessagesCount = 0;
  51. try
  52. {
  53. await s.StartAsync(new MqttServerOptions());
  54. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  55. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  56. var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage);
  57. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  58. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  59. await c2.DisconnectAsync();
  60. await Task.Delay(1000);
  61. await c1.DisconnectAsync();
  62. }
  63. finally
  64. {
  65. await s.StopAsync();
  66. }
  67. Assert.AreEqual(1, receivedMessagesCount);
  68. }
  69. [TestMethod]
  70. public async Task MqttServer_SubscribeUnsubscribe()
  71. {
  72. var serverAdapter = new TestMqttServerAdapter();
  73. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  74. var receivedMessagesCount = 0;
  75. try
  76. {
  77. await s.StartAsync(new MqttServerOptions());
  78. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  79. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  80. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  81. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  82. await c2.PublishAsync(message);
  83. Assert.AreEqual(0, receivedMessagesCount);
  84. var subscribeEventCalled = false;
  85. s.ClientSubscribedTopic += (_, e) =>
  86. {
  87. subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1";
  88. };
  89. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  90. await Task.Delay(100);
  91. Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");
  92. await c2.PublishAsync(message);
  93. await Task.Delay(500);
  94. Assert.AreEqual(1, receivedMessagesCount);
  95. var unsubscribeEventCalled = false;
  96. s.ClientUnsubscribedTopic += (_, e) =>
  97. {
  98. unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1";
  99. };
  100. await c1.UnsubscribeAsync("a");
  101. await Task.Delay(100);
  102. Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");
  103. await c2.PublishAsync(message);
  104. await Task.Delay(500);
  105. Assert.AreEqual(1, receivedMessagesCount);
  106. }
  107. finally
  108. {
  109. await s.StopAsync();
  110. }
  111. await Task.Delay(500);
  112. Assert.AreEqual(1, receivedMessagesCount);
  113. }
  114. [TestMethod]
  115. public async Task MqttServer_Publish()
  116. {
  117. var serverAdapter = new TestMqttServerAdapter();
  118. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  119. var receivedMessagesCount = 0;
  120. try
  121. {
  122. await s.StartAsync(new MqttServerOptions());
  123. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  124. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  125. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  126. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  127. s.PublishAsync(message).Wait();
  128. await Task.Delay(500);
  129. }
  130. finally
  131. {
  132. await s.StopAsync();
  133. }
  134. Assert.AreEqual(1, receivedMessagesCount);
  135. }
  136. [TestMethod]
  137. public async Task MqttServer_NoRetainedMessage()
  138. {
  139. var serverAdapter = new TestMqttServerAdapter();
  140. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  141. var receivedMessagesCount = 0;
  142. try
  143. {
  144. await s.StartAsync(new MqttServerOptions());
  145. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  146. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).Build());
  147. await c1.DisconnectAsync();
  148. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  149. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  150. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  151. await Task.Delay(500);
  152. }
  153. finally
  154. {
  155. await s.StopAsync();
  156. }
  157. Assert.AreEqual(0, receivedMessagesCount);
  158. }
  159. [TestMethod]
  160. public async Task MqttServer_RetainedMessage()
  161. {
  162. var serverAdapter = new TestMqttServerAdapter();
  163. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  164. var receivedMessagesCount = 0;
  165. try
  166. {
  167. await s.StartAsync(new MqttServerOptions());
  168. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  169. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  170. await c1.DisconnectAsync();
  171. await Task.Delay(TimeSpan.FromSeconds(2));
  172. // TODO: Find another way to wait for the retained components.
  173. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  174. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  175. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  176. await Task.Delay(500);
  177. }
  178. finally
  179. {
  180. await s.StopAsync();
  181. }
  182. Assert.AreEqual(1, receivedMessagesCount);
  183. }
  184. [TestMethod]
  185. public async Task MqttServer_ClearRetainedMessage()
  186. {
  187. var serverAdapter = new TestMqttServerAdapter();
  188. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  189. var receivedMessagesCount = 0;
  190. try
  191. {
  192. await s.StartAsync(new MqttServerOptions());
  193. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  194. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  195. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build());
  196. await c1.DisconnectAsync();
  197. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  198. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  199. await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
  200. await Task.Delay(500);
  201. }
  202. finally
  203. {
  204. await s.StopAsync();
  205. }
  206. Assert.AreEqual(0, receivedMessagesCount);
  207. }
  208. [TestMethod]
  209. public async Task MqttServer_PersistRetainedMessage()
  210. {
  211. var storage = new TestStorage();
  212. var serverAdapter = new TestMqttServerAdapter();
  213. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  214. try
  215. {
  216. var options = new MqttServerOptions { Storage = storage };
  217. await s.StartAsync(options);
  218. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  219. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  220. await c1.DisconnectAsync();
  221. }
  222. finally
  223. {
  224. await s.StopAsync();
  225. }
  226. await Task.Delay(TimeSpan.FromSeconds(2));
  227. // TODO: Find another way to wait for the retained components.
  228. s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  229. var receivedMessagesCount = 0;
  230. try
  231. {
  232. var options = new MqttServerOptions { Storage = storage };
  233. await s.StartAsync(options);
  234. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  235. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  236. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  237. await Task.Delay(500);
  238. }
  239. finally
  240. {
  241. await s.StopAsync();
  242. }
  243. Assert.AreEqual(1, receivedMessagesCount);
  244. }
  245. [TestMethod]
  246. public async Task MqttServer_InterceptMessage()
  247. {
  248. void Interceptor(MqttApplicationMessageInterceptorContext context)
  249. {
  250. context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
  251. }
  252. var serverAdapter = new TestMqttServerAdapter();
  253. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  254. try
  255. {
  256. var options = new MqttServerOptions { ApplicationMessageInterceptor = Interceptor };
  257. await s.StartAsync(options);
  258. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  259. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  260. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build());
  261. var isIntercepted = false;
  262. c2.ApplicationMessageReceived += (sender, args) =>
  263. {
  264. isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
  265. };
  266. var m = new MqttApplicationMessageBuilder().WithTopic("test").Build();
  267. await c1.PublishAsync(m);
  268. await c1.DisconnectAsync();
  269. await Task.Delay(500);
  270. Assert.IsTrue(isIntercepted);
  271. }
  272. finally
  273. {
  274. await s.StopAsync();
  275. }
  276. }
  277. [TestMethod]
  278. public async Task MqttServer_Body()
  279. {
  280. var serverAdapter = new TestMqttServerAdapter();
  281. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  282. var bodyIsMatching = false;
  283. try
  284. {
  285. await s.StartAsync(new MqttServerOptions());
  286. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  287. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  288. c1.ApplicationMessageReceived += (_, e) =>
  289. {
  290. if (Encoding.UTF8.GetString(e.ApplicationMessage.Payload) == "The body")
  291. {
  292. bodyIsMatching = true;
  293. }
  294. };
  295. await c1.SubscribeAsync("A", MqttQualityOfServiceLevel.AtMostOnce);
  296. await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("A").WithPayload(Encoding.UTF8.GetBytes("The body")).Build());
  297. await Task.Delay(500);
  298. }
  299. finally
  300. {
  301. await s.StopAsync();
  302. }
  303. Assert.IsTrue(bodyIsMatching);
  304. }
  305. private class TestStorage : IMqttServerStorage
  306. {
  307. private IList<MqttApplicationMessage> _messages = new List<MqttApplicationMessage>();
  308. public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
  309. {
  310. _messages = messages;
  311. return Task.CompletedTask;
  312. }
  313. public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
  314. {
  315. return Task.FromResult(_messages);
  316. }
  317. }
  318. private static async Task TestPublishAsync(
  319. string topic,
  320. MqttQualityOfServiceLevel qualityOfServiceLevel,
  321. string topicFilter,
  322. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  323. int expectedReceivedMessagesCount)
  324. {
  325. var serverAdapter = new TestMqttServerAdapter();
  326. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  327. var receivedMessagesCount = 0;
  328. try
  329. {
  330. await s.StartAsync(new MqttServerOptions());
  331. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  332. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  333. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  334. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());
  335. await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build());
  336. await Task.Delay(500);
  337. await c1.UnsubscribeAsync(topicFilter);
  338. await Task.Delay(500);
  339. }
  340. finally
  341. {
  342. await s.StopAsync();
  343. }
  344. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  345. }
  346. }
  347. }