No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.
 
 
 
 

537 líneas
19 KiB

  1. using Microsoft.VisualStudio.TestTools.UnitTesting;
  2. using MQTTnet.Client;
  3. using MQTTnet.Diagnostics;
  4. using MQTTnet.Protocol;
  5. using MQTTnet.Server;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Text;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. using MQTTnet.Implementations;
  12. namespace MQTTnet.Core.Tests
  13. {
  14. [TestClass]
  15. public class MqttServerTests
  16. {
  17. [TestMethod]
  18. public void MqttServer_PublishSimple_AtMostOnce()
  19. {
  20. TestPublishAsync(
  21. "A/B/C",
  22. MqttQualityOfServiceLevel.AtMostOnce,
  23. "A/B/C",
  24. MqttQualityOfServiceLevel.AtMostOnce,
  25. 1).Wait();
  26. }
  27. [TestMethod]
  28. public void MqttServer_PublishSimple_AtLeastOnce()
  29. {
  30. TestPublishAsync(
  31. "A/B/C",
  32. MqttQualityOfServiceLevel.AtLeastOnce,
  33. "A/B/C",
  34. MqttQualityOfServiceLevel.AtLeastOnce,
  35. 1).Wait();
  36. }
  37. [TestMethod]
  38. public void MqttServer_PublishSimple_ExactlyOnce()
  39. {
  40. TestPublishAsync(
  41. "A/B/C",
  42. MqttQualityOfServiceLevel.ExactlyOnce,
  43. "A/B/C",
  44. MqttQualityOfServiceLevel.ExactlyOnce,
  45. 1).Wait();
  46. }
  47. [TestMethod]
  48. public async Task MqttServer_WillMessage()
  49. {
  50. var serverAdapter = new TestMqttServerAdapter();
  51. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  52. var receivedMessagesCount = 0;
  53. try
  54. {
  55. await s.StartAsync(new MqttServerOptions());
  56. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  57. var c1 = await serverAdapter.ConnectTestClient("c1");
  58. var c2 = await serverAdapter.ConnectTestClient("c2", willMessage);
  59. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  60. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
  61. await c2.DisconnectAsync();
  62. await Task.Delay(1000);
  63. await c1.DisconnectAsync();
  64. }
  65. finally
  66. {
  67. await s.StopAsync();
  68. }
  69. Assert.AreEqual(0, receivedMessagesCount);
  70. }
  71. [TestMethod]
  72. public async Task MqttServer_SubscribeUnsubscribe()
  73. {
  74. var serverAdapter = new TestMqttServerAdapter();
  75. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  76. var receivedMessagesCount = 0;
  77. try
  78. {
  79. await s.StartAsync(new MqttServerOptions());
  80. var c1 = await serverAdapter.ConnectTestClient("c1");
  81. var c2 = await serverAdapter.ConnectTestClient("c2");
  82. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  83. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  84. await c2.PublishAsync(message);
  85. await Task.Delay(1000);
  86. Assert.AreEqual(0, receivedMessagesCount);
  87. var subscribeEventCalled = false;
  88. s.ClientSubscribedTopic += (_, e) =>
  89. {
  90. subscribeEventCalled = e.TopicFilter.Topic == "a" && e.ClientId == "c1";
  91. };
  92. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  93. await Task.Delay(500);
  94. Assert.IsTrue(subscribeEventCalled, "Subscribe event not called.");
  95. await c2.PublishAsync(message);
  96. await Task.Delay(500);
  97. Assert.AreEqual(1, receivedMessagesCount);
  98. var unsubscribeEventCalled = false;
  99. s.ClientUnsubscribedTopic += (_, e) =>
  100. {
  101. unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1";
  102. };
  103. await c1.UnsubscribeAsync("a");
  104. await Task.Delay(500);
  105. Assert.IsTrue(unsubscribeEventCalled, "Unsubscribe event not called.");
  106. await c2.PublishAsync(message);
  107. await Task.Delay(1000);
  108. Assert.AreEqual(1, receivedMessagesCount);
  109. }
  110. finally
  111. {
  112. await s.StopAsync();
  113. }
  114. await Task.Delay(500);
  115. Assert.AreEqual(1, receivedMessagesCount);
  116. }
  117. [TestMethod]
  118. public async Task MqttServer_Publish()
  119. {
  120. var serverAdapter = new TestMqttServerAdapter();
  121. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  122. var receivedMessagesCount = 0;
  123. try
  124. {
  125. await s.StartAsync(new MqttServerOptions());
  126. var c1 = await serverAdapter.ConnectTestClient("c1");
  127. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  128. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  129. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  130. await s.PublishAsync(message);
  131. await Task.Delay(500);
  132. }
  133. finally
  134. {
  135. await s.StopAsync();
  136. }
  137. Assert.AreEqual(1, receivedMessagesCount);
  138. }
  139. [TestMethod]
  140. public async Task MqttServer_Publish_MultipleClients()
  141. {
  142. var serverAdapter = new MqttTcpServerAdapter(new MqttNetLogger().CreateChildLogger());
  143. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  144. var receivedMessagesCount = 0;
  145. var locked = new object();
  146. var clientOptions = new MqttClientOptionsBuilder()
  147. .WithTcpServer("localhost")
  148. .Build();
  149. var clientOptions2 = new MqttClientOptionsBuilder()
  150. .WithTcpServer("localhost")
  151. .Build();
  152. try
  153. {
  154. await s.StartAsync(new MqttServerOptions());
  155. var c1 = new MqttFactory().CreateMqttClient();
  156. var c2 = new MqttFactory().CreateMqttClient();
  157. await c1.ConnectAsync(clientOptions);
  158. await c2.ConnectAsync(clientOptions2);
  159. c1.ApplicationMessageReceived += (_, __) =>
  160. {
  161. lock (locked)
  162. {
  163. receivedMessagesCount++;
  164. }
  165. };
  166. c2.ApplicationMessageReceived += (_, __) =>
  167. {
  168. lock (locked)
  169. {
  170. receivedMessagesCount++;
  171. }
  172. };
  173. var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build();
  174. await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  175. await c2.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
  176. //await Task.WhenAll(Publish(c1, message), Publish(c2, message));
  177. await Publish(c1, message);
  178. await Task.Delay(500);
  179. }
  180. finally
  181. {
  182. await s.StopAsync();
  183. }
  184. Assert.AreEqual(2000, receivedMessagesCount);
  185. }
  186. private static async Task Publish(IMqttClient c1, MqttApplicationMessage message)
  187. {
  188. for (int i = 0; i < 1000; i++)
  189. {
  190. await c1.PublishAsync(message);
  191. }
  192. }
  193. [TestMethod]
  194. public async Task MqttServer_RetainedMessagesFlow()
  195. {
  196. var retainedMessage = new MqttApplicationMessageBuilder().WithTopic("r").WithPayload("r").WithRetainFlag().Build();
  197. var serverAdapter = new TestMqttServerAdapter();
  198. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  199. await s.StartAsync(new MqttServerOptions());
  200. var c1 = await serverAdapter.ConnectTestClient("c1");
  201. await c1.PublishAsync(retainedMessage);
  202. Thread.Sleep(500);
  203. await c1.DisconnectAsync();
  204. Thread.Sleep(500);
  205. var receivedMessages = 0;
  206. var c2 = await serverAdapter.ConnectTestClient("c2");
  207. c2.ApplicationMessageReceived += (_, e) =>
  208. {
  209. receivedMessages++;
  210. };
  211. for (var i = 0; i < 5; i++)
  212. {
  213. await c2.UnsubscribeAsync("r");
  214. await Task.Delay(500);
  215. Assert.AreEqual(i, receivedMessages);
  216. await c2.SubscribeAsync("r");
  217. await Task.Delay(500);
  218. Assert.AreEqual(i + 1, receivedMessages);
  219. }
  220. await c2.DisconnectAsync();
  221. }
  222. [TestMethod]
  223. public async Task MqttServer_NoRetainedMessage()
  224. {
  225. var serverAdapter = new TestMqttServerAdapter();
  226. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  227. var receivedMessagesCount = 0;
  228. try
  229. {
  230. await s.StartAsync(new MqttServerOptions());
  231. var c1 = await serverAdapter.ConnectTestClient("c1");
  232. await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[3]));
  233. await c1.DisconnectAsync();
  234. var c2 = await serverAdapter.ConnectTestClient("c2");
  235. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  236. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  237. await Task.Delay(500);
  238. }
  239. finally
  240. {
  241. await s.StopAsync();
  242. }
  243. Assert.AreEqual(0, receivedMessagesCount);
  244. }
  245. [TestMethod]
  246. public async Task MqttServer_RetainedMessage()
  247. {
  248. var serverAdapter = new TestMqttServerAdapter();
  249. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  250. var receivedMessagesCount = 0;
  251. try
  252. {
  253. await s.StartAsync(new MqttServerOptions());
  254. var c1 = await serverAdapter.ConnectTestClient("c1");
  255. await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  256. await c1.DisconnectAsync();
  257. var c2 = await serverAdapter.ConnectTestClient("c2");
  258. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  259. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  260. await Task.Delay(500);
  261. }
  262. finally
  263. {
  264. await s.StopAsync();
  265. }
  266. Assert.AreEqual(1, receivedMessagesCount);
  267. }
  268. [TestMethod]
  269. public async Task MqttServer_ClearRetainedMessage()
  270. {
  271. var serverAdapter = new TestMqttServerAdapter();
  272. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  273. var receivedMessagesCount = 0;
  274. try
  275. {
  276. await s.StartAsync(new MqttServerOptions());
  277. var c1 = await serverAdapter.ConnectTestClient("c1");
  278. await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag());
  279. await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag());
  280. await c1.DisconnectAsync();
  281. var c2 = await serverAdapter.ConnectTestClient("c2");
  282. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  283. await Task.Delay(200);
  284. await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
  285. await Task.Delay(500);
  286. }
  287. finally
  288. {
  289. await s.StopAsync();
  290. }
  291. Assert.AreEqual(0, receivedMessagesCount);
  292. }
  293. [TestMethod]
  294. public async Task MqttServer_PersistRetainedMessage()
  295. {
  296. var storage = new TestStorage();
  297. var serverAdapter = new TestMqttServerAdapter();
  298. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  299. try
  300. {
  301. var options = new MqttServerOptions { Storage = storage };
  302. await s.StartAsync(options);
  303. var c1 = await serverAdapter.ConnectTestClient("c1");
  304. await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
  305. await c1.DisconnectAsync();
  306. }
  307. finally
  308. {
  309. await s.StopAsync();
  310. }
  311. Assert.AreEqual(1, storage.Messages.Count);
  312. s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  313. var receivedMessagesCount = 0;
  314. try
  315. {
  316. var options = new MqttServerOptions { Storage = storage };
  317. await s.StartAsync(options);
  318. var c2 = await serverAdapter.ConnectTestClient("c2");
  319. c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  320. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
  321. await Task.Delay(500);
  322. }
  323. finally
  324. {
  325. await s.StopAsync();
  326. }
  327. Assert.AreEqual(1, receivedMessagesCount);
  328. }
  329. [TestMethod]
  330. public async Task MqttServer_InterceptMessage()
  331. {
  332. void Interceptor(MqttApplicationMessageInterceptorContext context)
  333. {
  334. context.ApplicationMessage.Payload = Encoding.ASCII.GetBytes("extended");
  335. }
  336. var serverAdapter = new TestMqttServerAdapter();
  337. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  338. try
  339. {
  340. var options = new MqttServerOptions { ApplicationMessageInterceptor = Interceptor };
  341. await s.StartAsync(options);
  342. var c1 = await serverAdapter.ConnectTestClient("c1");
  343. var c2 = await serverAdapter.ConnectTestClient("c2");
  344. await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build());
  345. var isIntercepted = false;
  346. c2.ApplicationMessageReceived += (sender, args) =>
  347. {
  348. isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0;
  349. };
  350. await c1.PublishAsync(builder => builder.WithTopic("test"));
  351. await c1.DisconnectAsync();
  352. await Task.Delay(500);
  353. Assert.IsTrue(isIntercepted);
  354. }
  355. finally
  356. {
  357. await s.StopAsync();
  358. }
  359. }
  360. [TestMethod]
  361. public async Task MqttServer_Body()
  362. {
  363. var serverAdapter = new TestMqttServerAdapter();
  364. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  365. var bodyIsMatching = false;
  366. try
  367. {
  368. await s.StartAsync(new MqttServerOptions());
  369. var c1 = await serverAdapter.ConnectTestClient("c1");
  370. var c2 = await serverAdapter.ConnectTestClient("c2");
  371. c1.ApplicationMessageReceived += (_, e) =>
  372. {
  373. if (Encoding.UTF8.GetString(e.ApplicationMessage.Payload) == "The body")
  374. {
  375. bodyIsMatching = true;
  376. }
  377. };
  378. await c1.SubscribeAsync("A", MqttQualityOfServiceLevel.AtMostOnce);
  379. await c2.PublishAsync(builder => builder.WithTopic("A").WithPayload(Encoding.UTF8.GetBytes("The body")));
  380. await Task.Delay(1000);
  381. }
  382. finally
  383. {
  384. await s.StopAsync();
  385. }
  386. Assert.IsTrue(bodyIsMatching);
  387. }
  388. private class TestStorage : IMqttServerStorage
  389. {
  390. public IList<MqttApplicationMessage> Messages = new List<MqttApplicationMessage>();
  391. public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
  392. {
  393. Messages = messages;
  394. return Task.CompletedTask;
  395. }
  396. public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
  397. {
  398. return Task.FromResult(Messages);
  399. }
  400. }
  401. private static async Task TestPublishAsync(
  402. string topic,
  403. MqttQualityOfServiceLevel qualityOfServiceLevel,
  404. string topicFilter,
  405. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  406. int expectedReceivedMessagesCount)
  407. {
  408. var serverAdapter = new TestMqttServerAdapter();
  409. var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
  410. var receivedMessagesCount = 0;
  411. try
  412. {
  413. await s.StartAsync(new MqttServerOptions());
  414. var c1 = await serverAdapter.ConnectTestClient("c1");
  415. var c2 = await serverAdapter.ConnectTestClient("c2");
  416. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  417. await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());
  418. await c2.PublishAsync(builder => builder.WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel));
  419. await Task.Delay(500);
  420. await c1.UnsubscribeAsync(topicFilter);
  421. await Task.Delay(500);
  422. }
  423. finally
  424. {
  425. await s.StopAsync();
  426. }
  427. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  428. }
  429. }
  430. }