You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

440 lines
15 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using Microsoft.VisualStudio.TestTools.UnitTesting;
  7. using MQTTnet.Core.Adapter;
  8. using MQTTnet.Core.Client;
  9. using MQTTnet.Core.Protocol;
  10. using MQTTnet.Core.Server;
  11. using Microsoft.Extensions.DependencyInjection;
  12. using MQTTnet.Core.Packets;
  13. namespace MQTTnet.Core.Tests
  14. {
  15. [TestClass]
  16. public class MqttServerTests
  17. {
  18. [TestMethod]
  19. public void MqttServer_PublishSimple_AtMostOnce()
  20. {
  21. TestPublishAsync(
  22. "A/B/C",
  23. MqttQualityOfServiceLevel.AtMostOnce,
  24. "A/B/C",
  25. MqttQualityOfServiceLevel.AtMostOnce,
  26. 1).Wait();
  27. }
  28. [TestMethod]
  29. public void MqttServer_PublishSimple_AtLeastOnce()
  30. {
  31. TestPublishAsync(
  32. "A/B/C",
  33. MqttQualityOfServiceLevel.AtLeastOnce,
  34. "A/B/C",
  35. MqttQualityOfServiceLevel.AtLeastOnce,
  36. 1).Wait();
  37. }
  38. [TestMethod]
  39. public void MqttServer_PublishSimple_ExactlyOnce()
  40. {
  41. TestPublishAsync(
  42. "A/B/C",
  43. MqttQualityOfServiceLevel.ExactlyOnce,
  44. "A/B/C",
  45. MqttQualityOfServiceLevel.ExactlyOnce,
  46. 1).Wait();
  47. }
  48. [TestMethod]
  49. public async Task MqttServer_WillMessage()
  50. {
  51. var serverAdapter = new TestMqttServerAdapter();
  52. var services = new ServiceCollection()
  53. .AddLogging()
  54. .AddMqttServer()
  55. .AddSingleton<IMqttServerAdapter>(serverAdapter)
  56. .BuildServiceProvider();
  57. var s = new MqttFactory(services).CreateMqttServer();
  58. var receivedMessagesCount = 0;
  59. try
  60. {
  61. await s.StartAsync();
  62. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  63. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  64. var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage);
  65. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  66. await c1.SubscribeAsync(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce));
  67. await c2.DisconnectAsync();
  68. await Task.Delay(1000);
  69. }
  70. finally
  71. {
  72. await s.StopAsync();
  73. }
  74. Assert.AreEqual(1, receivedMessagesCount);
  75. }
  76. [TestMethod]
  77. public async Task MqttServer_Unsubscribe()
  78. {
  79. var serverAdapter = new TestMqttServerAdapter();
  80. var services = new ServiceCollection()
  81. .AddLogging()
  82. .AddMqttServer()
  83. .AddSingleton<IMqttServerAdapter>(serverAdapter)
  84. .BuildServiceProvider();
  85. var s = new MqttFactory(services).CreateMqttServer();
  86. var receivedMessagesCount = 0;
  87. try
  88. {
  89. await s.StartAsync();
  90. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  91. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  92. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  93. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  94. await c2.PublishAsync(message);
  95. Assert.AreEqual(0, receivedMessagesCount);
  96. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  97. await c2.PublishAsync(message);
  98. await Task.Delay(500);
  99. Assert.AreEqual(1, receivedMessagesCount);
  100. await c1.UnsubscribeAsync("a");
  101. await c2.PublishAsync(message);
  102. await Task.Delay(500);
  103. Assert.AreEqual(1, receivedMessagesCount);
  104. }
  105. finally
  106. {
  107. await s.StopAsync();
  108. }
  109. await Task.Delay(500);
  110. Assert.AreEqual(1, receivedMessagesCount);
  111. }
  112. [TestMethod]
  113. public async Task MqttServer_Publish()
  114. {
  115. var serverAdapter = new TestMqttServerAdapter();
  116. var services = new ServiceCollection()
  117. .AddLogging()
  118. .AddMqttServer()
  119. .AddSingleton<IMqttServerAdapter>(serverAdapter)
  120. .BuildServiceProvider();
  121. var s = new MqttFactory(services).CreateMqttServer();
  122. var receivedMessagesCount = 0;
  123. try
  124. {
  125. await s.StartAsync();
  126. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  127. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  128. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  129. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  130. s.PublishAsync(message).Wait();
  131. await Task.Delay(500);
  132. }
  133. finally
  134. {
  135. await s.StopAsync();
  136. }
  137. Assert.AreEqual(1, receivedMessagesCount);
  138. }
  139. [TestMethod]
  140. public async Task MqttServer_NoRetainedMessage()
  141. {
  142. var serverAdapter = new TestMqttServerAdapter();
  143. var services = new ServiceCollection()
  144. .AddLogging()
  145. .AddMqttServer()
  146. .AddSingleton<IMqttServerAdapter>(serverAdapter)
  147. .BuildServiceProvider();
  148. var s = new MqttFactory(services).CreateMqttServer();
  149. var receivedMessagesCount = 0;
  150. try
  151. {
  152. await s.StartAsync();
  153. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  154. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).Build());
  155. await c1.DisconnectAsync();
  156. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  157. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  158. await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
  159. await Task.Delay(500);
  160. }
  161. finally
  162. {
  163. await s.StopAsync();
  164. }
  165. Assert.AreEqual(0, receivedMessagesCount);
  166. }
  167. [TestMethod]
  168. public async Task MqttServer_RetainedMessage()
  169. {
  170. var serverAdapter = new TestMqttServerAdapter();
  171. var services = new ServiceCollection()
  172. .AddLogging()
  173. .AddMqttServer()
  174. .AddSingleton<IMqttServerAdapter>(serverAdapter)
  175. .BuildServiceProvider();
  176. var s = new MqttFactory(services).CreateMqttServer();
  177. var retainMessagemanager = services.GetRequiredService<IMqttClientRetainedMessageManager>();
  178. var receivedMessagesCount = 0;
  179. try
  180. {
  181. await s.StartAsync();
  182. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  183. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  184. await c1.DisconnectAsync();
  185. var subscribe = new MqttSubscribePacket()
  186. {
  187. TopicFilters = new List<TopicFilter>()
  188. {
  189. new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)
  190. }
  191. };
  192. //make shure the retainedMessageManagerreceived the package
  193. while (!retainMessagemanager.GetMessages(subscribe).Any())
  194. {
  195. await Task.Delay(TimeSpan.FromMilliseconds(10));
  196. }
  197. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  198. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  199. await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
  200. await Task.Delay(500);
  201. }
  202. finally
  203. {
  204. await s.StopAsync();
  205. }
  206. Assert.AreEqual(1, receivedMessagesCount);
  207. }
  208. [TestMethod]
  209. public async Task MqttServer_ClearRetainedMessage()
  210. {
  211. var serverAdapter = new TestMqttServerAdapter();
  212. var services = new ServiceCollection()
  213. .AddLogging()
  214. .AddMqttServer()
  215. .AddSingleton<IMqttServerAdapter>(serverAdapter)
  216. .BuildServiceProvider();
  217. var s = new MqttFactory(services).CreateMqttServer();
  218. var receivedMessagesCount = 0;
  219. try
  220. {
  221. await s.StartAsync();
  222. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  223. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  224. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build());
  225. await c1.DisconnectAsync();
  226. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  227. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  228. await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
  229. await Task.Delay(500);
  230. }
  231. finally
  232. {
  233. await s.StopAsync();
  234. }
  235. Assert.AreEqual(0, receivedMessagesCount);
  236. }
  237. [TestMethod]
  238. public async Task MqttServer_PersistRetainedMessage()
  239. {
  240. var storage = new TestStorage();
  241. var serverAdapter = new TestMqttServerAdapter();
  242. var services = new ServiceCollection()
  243. .AddLogging()
  244. .AddMqttServer()
  245. .AddSingleton<IMqttServerAdapter>(serverAdapter)
  246. .BuildServiceProvider();
  247. var s = new MqttFactory(services).CreateMqttServer(options => options.Storage = storage);
  248. try
  249. {
  250. await s.StartAsync();
  251. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  252. await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  253. await c1.DisconnectAsync();
  254. }
  255. finally
  256. {
  257. await s.StopAsync();
  258. }
  259. s = new MqttFactory(services).CreateMqttServer(options => options.Storage = storage);
  260. var receivedMessagesCount = 0;
  261. try
  262. {
  263. await s.StartAsync();
  264. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  265. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  266. await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
  267. await Task.Delay(500);
  268. }
  269. finally
  270. {
  271. await s.StopAsync();
  272. }
  273. Assert.AreEqual(1, receivedMessagesCount);
  274. }
  275. [TestMethod]
  276. public async Task MqttServer_InterceptMessage()
  277. {
  278. void Interceptor(MqttApplicationMessageInterceptorContext context)
  279. {
  280. context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
  281. }
  282. var serverAdapter = new TestMqttServerAdapter();
  283. var services = new ServiceCollection()
  284. .AddLogging()
  285. .AddMqttServer()
  286. .AddSingleton<IMqttServerAdapter>(serverAdapter)
  287. .BuildServiceProvider();
  288. var s = new MqttFactory(services).CreateMqttServer(options => options.ApplicationMessageInterceptor = Interceptor);
  289. try
  290. {
  291. await s.StartAsync();
  292. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  293. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  294. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build());
  295. var isIntercepted = false;
  296. c2.ApplicationMessageReceived += (sender, args) =>
  297. {
  298. isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
  299. };
  300. var m = new MqttApplicationMessageBuilder().WithTopic("test").Build();
  301. await c1.PublishAsync(m);
  302. await c1.DisconnectAsync();
  303. await Task.Delay(500);
  304. Assert.IsTrue(isIntercepted);
  305. }
  306. finally
  307. {
  308. await s.StopAsync();
  309. }
  310. }
  311. private class TestStorage : IMqttServerStorage
  312. {
  313. private IList<MqttApplicationMessage> _messages = new List<MqttApplicationMessage>();
  314. public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
  315. {
  316. _messages = messages;
  317. return Task.CompletedTask;
  318. }
  319. public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
  320. {
  321. return Task.FromResult(_messages);
  322. }
  323. }
  324. private static async Task TestPublishAsync(
  325. string topic,
  326. MqttQualityOfServiceLevel qualityOfServiceLevel,
  327. string topicFilter,
  328. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  329. int expectedReceivedMessagesCount)
  330. {
  331. var serverAdapter = new TestMqttServerAdapter();
  332. var services = new ServiceCollection()
  333. .AddMqttServer()
  334. .AddLogging()
  335. .AddSingleton<IMqttServerAdapter>(serverAdapter)
  336. .BuildServiceProvider();
  337. var s = services.GetRequiredService<IMqttServer>();
  338. var receivedMessagesCount = 0;
  339. try
  340. {
  341. await s.StartAsync();
  342. var c1 = await serverAdapter.ConnectTestClient(s, "c1");
  343. var c2 = await serverAdapter.ConnectTestClient(s, "c2");
  344. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  345. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());
  346. await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build());
  347. await Task.Delay(500);
  348. await c1.UnsubscribeAsync(topicFilter);
  349. await Task.Delay(500);
  350. }
  351. finally
  352. {
  353. await s.StopAsync();
  354. }
  355. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  356. }
  357. }
  358. }