You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

509 lines
20 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using Microsoft.VisualStudio.TestTools.UnitTesting;
  7. using MQTTnet.Client;
  8. using MQTTnet.Client.Connecting;
  9. using MQTTnet.Client.Options;
  10. using MQTTnet.Client.Receiving;
  11. using MQTTnet.Diagnostics;
  12. using MQTTnet.Diagnostics.Logger;
  13. using MQTTnet.Extensions.ManagedClient;
  14. using MQTTnet.Server;
  15. using MQTTnet.Tests.Mockups;
  16. namespace MQTTnet.Tests.Client
  17. {
  18. [TestClass]
  19. public class ManagedMqttClient_Tests
  20. {
  21. public TestContext TestContext { get; set; }
  22. [TestMethod]
  23. public async Task Drop_New_Messages_On_Full_Queue()
  24. {
  25. var factory = new MqttFactory();
  26. var managedClient = factory.CreateManagedMqttClient();
  27. try
  28. {
  29. var clientOptions = new ManagedMqttClientOptionsBuilder()
  30. .WithMaxPendingMessages(5)
  31. .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage);
  32. clientOptions.WithClientOptions(o => o.WithTcpServer("localhost"));
  33. await managedClient.StartAsync(clientOptions.Build());
  34. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "1" });
  35. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "2" });
  36. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "3" });
  37. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "4" });
  38. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "5" });
  39. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "6" });
  40. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "7" });
  41. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "8" });
  42. Assert.AreEqual(5, managedClient.PendingApplicationMessagesCount);
  43. }
  44. finally
  45. {
  46. await managedClient.StopAsync();
  47. }
  48. }
  49. [TestMethod]
  50. public async Task ManagedClients_Will_Message_Send()
  51. {
  52. using (var testEnvironment = new TestEnvironment(TestContext))
  53. {
  54. var receivedMessagesCount = 0;
  55. await testEnvironment.StartServer();
  56. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  57. var clientOptions = new MqttClientOptionsBuilder()
  58. .WithTcpServer("localhost", testEnvironment.ServerPort)
  59. .WithWillMessage(willMessage);
  60. var dyingClient = testEnvironment.CreateClient();
  61. var dyingManagedClient = new ManagedMqttClient(dyingClient, testEnvironment.ClientLogger);
  62. await dyingManagedClient.StartAsync(new ManagedMqttClientOptionsBuilder()
  63. .WithClientOptions(clientOptions)
  64. .Build());
  65. var recievingClient = await testEnvironment.ConnectClient();
  66. await recievingClient.SubscribeAsync("My/last/will");
  67. recievingClient.UseApplicationMessageReceivedHandler(context => Interlocked.Increment(ref receivedMessagesCount));
  68. dyingManagedClient.Dispose();
  69. await Task.Delay(1000);
  70. Assert.AreEqual(1, receivedMessagesCount);
  71. }
  72. }
  73. [TestMethod]
  74. public async Task Start_Stop()
  75. {
  76. using (var testEnvironment = new TestEnvironment(TestContext))
  77. {
  78. var server = await testEnvironment.StartServer();
  79. var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetEventLogger());
  80. var clientOptions = new MqttClientOptionsBuilder()
  81. .WithTcpServer("localhost", testEnvironment.ServerPort);
  82. var connected = GetConnectedTask(managedClient);
  83. await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder()
  84. .WithClientOptions(clientOptions)
  85. .Build());
  86. await connected;
  87. await managedClient.StopAsync();
  88. await Task.Delay(500);
  89. Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count);
  90. }
  91. }
  92. [TestMethod]
  93. public async Task Storage_Queue_Drains()
  94. {
  95. using (var testEnvironment = new TestEnvironment(TestContext))
  96. {
  97. testEnvironment.IgnoreClientLogErrors = true;
  98. testEnvironment.IgnoreServerLogErrors = true;
  99. await testEnvironment.StartServer();
  100. var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetEventLogger());
  101. var clientOptions = new MqttClientOptionsBuilder()
  102. .WithTcpServer("localhost", testEnvironment.ServerPort);
  103. var storage = new ManagedMqttClientTestStorage();
  104. var connected = GetConnectedTask(managedClient);
  105. await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder()
  106. .WithClientOptions(clientOptions)
  107. .WithStorage(storage)
  108. .WithAutoReconnectDelay(System.TimeSpan.FromSeconds(5))
  109. .Build());
  110. await connected;
  111. await testEnvironment.Server.StopAsync();
  112. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "1" });
  113. //Message should have been added to the storage queue in PublishAsync,
  114. //and we are awaiting PublishAsync, so the message should already be
  115. //in storage at this point (i.e. no waiting).
  116. Assert.AreEqual(1, storage.GetMessageCount());
  117. connected = GetConnectedTask(managedClient);
  118. await testEnvironment.Server.StartAsync(new MqttServerOptionsBuilder()
  119. .WithDefaultEndpointPort(testEnvironment.ServerPort).Build());
  120. await connected;
  121. //Wait 500ms here so the client has time to publish the queued message
  122. await Task.Delay(500);
  123. Assert.AreEqual(0, storage.GetMessageCount());
  124. await managedClient.StopAsync();
  125. }
  126. }
  127. [TestMethod]
  128. public async Task Subscriptions_And_Unsubscriptions_Are_Made_And_Reestablished_At_Reconnect()
  129. {
  130. using (var testEnvironment = new TestEnvironment(TestContext))
  131. {
  132. var unmanagedClient = testEnvironment.CreateClient();
  133. var managedClient = await CreateManagedClientAsync(testEnvironment, unmanagedClient);
  134. var received = SetupReceivingOfMessages(managedClient, 2);
  135. // Perform some opposing subscriptions and unsubscriptions to verify
  136. // that these conflicting subscriptions are handled correctly
  137. await managedClient.SubscribeAsync("keptSubscribed");
  138. await managedClient.SubscribeAsync("subscribedThenUnsubscribed");
  139. await managedClient.UnsubscribeAsync("subscribedThenUnsubscribed");
  140. await managedClient.UnsubscribeAsync("unsubscribedThenSubscribed");
  141. await managedClient.SubscribeAsync("unsubscribedThenSubscribed");
  142. //wait a bit for the subscriptions to become established before the messages are published
  143. await Task.Delay(500);
  144. var sendingClient = await testEnvironment.ConnectClient();
  145. async Task PublishMessages()
  146. {
  147. await sendingClient.PublishAsync("keptSubscribed", new byte[] { 1 });
  148. await sendingClient.PublishAsync("subscribedThenUnsubscribed", new byte[] { 1 });
  149. await sendingClient.PublishAsync("unsubscribedThenSubscribed", new byte[] { 1 });
  150. }
  151. await PublishMessages();
  152. async Task AssertMessagesReceived()
  153. {
  154. var messages = await received;
  155. Assert.AreEqual("keptSubscribed", messages[0].Topic);
  156. Assert.AreEqual("unsubscribedThenSubscribed", messages[1].Topic);
  157. }
  158. await AssertMessagesReceived();
  159. var connected = GetConnectedTask(managedClient);
  160. await unmanagedClient.DisconnectAsync();
  161. // the managed client has to reconnect by itself
  162. await connected;
  163. // wait a bit so that the managed client can reestablish the subscriptions
  164. await Task.Delay(500);
  165. received = SetupReceivingOfMessages(managedClient, 2);
  166. await PublishMessages();
  167. // and then the same subscriptions need to exist again
  168. await AssertMessagesReceived();
  169. }
  170. }
  171. // This case also serves as a regression test for the previous behavior which re-published
  172. // each and every existing subscriptions with every new subscription that was made
  173. // (causing performance problems and having the visible symptom of retained messages being received again)
  174. [TestMethod]
  175. public async Task Subscriptions_Subscribe_Only_New_Subscriptions()
  176. {
  177. using (var testEnvironment = new TestEnvironment(TestContext))
  178. {
  179. var managedClient = await CreateManagedClientAsync(testEnvironment);
  180. var sendingClient = await testEnvironment.ConnectClient();
  181. await managedClient.SubscribeAsync("topic");
  182. //wait a bit for the subscription to become established
  183. await Task.Delay(500);
  184. await sendingClient.PublishAsync(new MqttApplicationMessage { Topic = "topic", Payload = new byte[] { 1 }, Retain = true });
  185. var messages = await SetupReceivingOfMessages(managedClient, 1);
  186. Assert.AreEqual(1, messages.Count);
  187. Assert.AreEqual("topic", messages.Single().Topic);
  188. await managedClient.SubscribeAsync("anotherTopic");
  189. await Task.Delay(500);
  190. // The subscription of the other topic must not trigger a re-subscription of the existing topic
  191. // (and thus renewed receiving of the retained message)
  192. Assert.AreEqual(1, messages.Count);
  193. }
  194. }
  195. // This case also serves as a regression test for the previous behavior
  196. // that subscriptions were only published at the ConnectionCheckInterval
  197. [TestMethod]
  198. public async Task Subscriptions_Are_Published_Immediately()
  199. {
  200. using (var testEnvironment = new TestEnvironment(TestContext))
  201. {
  202. // Use a long connection check interval to verify that the subscriptions
  203. // do not depend on the connection check interval anymore
  204. var connectionCheckInterval = TimeSpan.FromSeconds(10);
  205. var managedClient = await CreateManagedClientAsync(testEnvironment, null, connectionCheckInterval);
  206. var sendingClient = await testEnvironment.ConnectClient();
  207. await sendingClient.PublishAsync(new MqttApplicationMessage { Topic = "topic", Payload = new byte[] { 1 }, Retain = true });
  208. var subscribeTime = DateTime.UtcNow;
  209. var messagesTask = SetupReceivingOfMessages(managedClient, 1);
  210. await managedClient.SubscribeAsync("topic");
  211. var messages = await messagesTask;
  212. var elapsed = DateTime.UtcNow - subscribeTime;
  213. Assert.IsTrue(elapsed < TimeSpan.FromSeconds(1), $"Subscriptions must be activated immediately, this one took {elapsed}");
  214. Assert.AreEqual(messages.Single().Topic, "topic");
  215. }
  216. }
  217. [TestMethod]
  218. public async Task Subscriptions_Are_Cleared_At_Logout()
  219. {
  220. using (var testEnvironment = new TestEnvironment(TestContext))
  221. {
  222. await testEnvironment.StartServer().ConfigureAwait(false);
  223. var sendingClient = await testEnvironment.ConnectClient().ConfigureAwait(false);
  224. await sendingClient.PublishAsync(new MqttApplicationMessage
  225. {
  226. Topic = "topic",
  227. Payload = new byte[] { 1 },
  228. Retain = true
  229. });
  230. // Wait a bit for the retained message to be available
  231. await Task.Delay(1000);
  232. await sendingClient.DisconnectAsync();
  233. // Now use the managed client and check if subscriptions get cleared properly.
  234. var clientOptions = new MqttClientOptionsBuilder()
  235. .WithTcpServer("127.0.0.1", testEnvironment.ServerPort);
  236. var receivedManagedMessages = new List<MqttApplicationMessage>();
  237. var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetEventLogger());
  238. managedClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(c =>
  239. {
  240. receivedManagedMessages.Add(c.ApplicationMessage);
  241. });
  242. await managedClient.SubscribeAsync("topic");
  243. await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder()
  244. .WithClientOptions(clientOptions)
  245. .WithAutoReconnectDelay(TimeSpan.FromSeconds(1))
  246. .Build());
  247. await Task.Delay(1000);
  248. Assert.AreEqual(1, receivedManagedMessages.Count);
  249. await managedClient.StopAsync();
  250. await Task.Delay(500);
  251. await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder()
  252. .WithClientOptions(clientOptions)
  253. .WithAutoReconnectDelay(TimeSpan.FromSeconds(1))
  254. .Build());
  255. await Task.Delay(1000);
  256. // After reconnect and then some delay, the retained message must not be received,
  257. // showing that the subscriptions were cleared
  258. Assert.AreEqual(1, receivedManagedMessages.Count);
  259. // Make sure that it gets received after subscribing again.
  260. await managedClient.SubscribeAsync("topic");
  261. await Task.Delay(500);
  262. Assert.AreEqual(2, receivedManagedMessages.Count);
  263. }
  264. }
  265. [TestMethod]
  266. public async Task Manage_Session_MaxParallel_Subscribe()
  267. {
  268. using (var testEnvironment = new TestEnvironment())
  269. {
  270. testEnvironment.IgnoreClientLogErrors = true;
  271. var serverOptions = new MqttServerOptionsBuilder();
  272. await testEnvironment.StartServer(serverOptions);
  273. var options = new MqttClientOptionsBuilder().WithClientId("1").WithKeepAlivePeriod(TimeSpan.FromSeconds(5));
  274. var hasReceive = false;
  275. void OnReceive()
  276. {
  277. if (!hasReceive)
  278. {
  279. hasReceive = true;
  280. }
  281. }
  282. var clients = await Task.WhenAll(Enumerable.Range(0, 25)
  283. .Select(i => TryConnect_Subscribe(testEnvironment, options, OnReceive)));
  284. var connectedClients = clients.Where(c => c?.IsConnected ?? false).ToList();
  285. Assert.AreEqual(1, connectedClients.Count);
  286. await Task.Delay(10000); // over 1.5T
  287. var option2 = new MqttClientOptionsBuilder().WithClientId("2").WithKeepAlivePeriod(TimeSpan.FromSeconds(10));
  288. var sendClient = await testEnvironment.ConnectClient(option2);
  289. await sendClient.PublishAsync("aaa", "1");
  290. await Task.Delay(3000);
  291. Assert.AreEqual(true, hasReceive);
  292. }
  293. }
  294. private async Task<IMqttClient> TryConnect_Subscribe(TestEnvironment testEnvironment, MqttClientOptionsBuilder options, Action onReceive)
  295. {
  296. try
  297. {
  298. var sendClient = await testEnvironment.ConnectClient(options);
  299. sendClient.ApplicationMessageReceivedHandler = new MQTTnet.Client.Receiving.MqttApplicationMessageReceivedHandlerDelegate(e =>
  300. {
  301. onReceive();
  302. });
  303. await sendClient.SubscribeAsync("aaa");
  304. return sendClient;
  305. }
  306. catch (System.Exception)
  307. {
  308. return null;
  309. }
  310. }
  311. async Task<ManagedMqttClient> CreateManagedClientAsync(
  312. TestEnvironment testEnvironment,
  313. IMqttClient underlyingClient = null,
  314. TimeSpan? connectionCheckInterval = null)
  315. {
  316. await testEnvironment.StartServer();
  317. var clientOptions = new MqttClientOptionsBuilder()
  318. .WithTcpServer("localhost", testEnvironment.ServerPort);
  319. var managedOptions = new ManagedMqttClientOptionsBuilder()
  320. .WithClientOptions(clientOptions)
  321. .Build();
  322. // Use a short connection check interval so that subscription operations are performed quickly
  323. // in order to verify against a previous implementation that performed subscriptions only
  324. // at connection check intervals
  325. managedOptions.ConnectionCheckInterval = connectionCheckInterval ?? TimeSpan.FromSeconds(0.1);
  326. var managedClient = new ManagedMqttClient(underlyingClient ?? testEnvironment.CreateClient(), new MqttNetEventLogger());
  327. var connected = GetConnectedTask(managedClient);
  328. await managedClient.StartAsync(managedOptions);
  329. await connected;
  330. return managedClient;
  331. }
  332. /// <summary>
  333. /// Returns a task that will finish when the <paramref name="managedClient"/> has connected
  334. /// </summary>
  335. Task GetConnectedTask(ManagedMqttClient managedClient)
  336. {
  337. TaskCompletionSource<bool> connected = new TaskCompletionSource<bool>();
  338. managedClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e =>
  339. {
  340. managedClient.ConnectedHandler = null;
  341. connected.SetResult(true);
  342. });
  343. return connected.Task;
  344. }
  345. /// <summary>
  346. /// Returns a task that will return the messages received on <paramref name="managedClient"/>
  347. /// when <paramref name="expectedNumberOfMessages"/> have been received
  348. /// </summary>
  349. Task<List<MqttApplicationMessage>> SetupReceivingOfMessages(ManagedMqttClient managedClient, int expectedNumberOfMessages)
  350. {
  351. var receivedMessages = new List<MqttApplicationMessage>();
  352. var result = new TaskCompletionSource<List<MqttApplicationMessage>>(TaskCreationOptions.RunContinuationsAsynchronously);
  353. managedClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(r =>
  354. {
  355. receivedMessages.Add(r.ApplicationMessage);
  356. if (receivedMessages.Count == expectedNumberOfMessages)
  357. {
  358. result.TrySetResult(receivedMessages);
  359. }
  360. });
  361. return result.Task;
  362. }
  363. }
  364. public class ManagedMqttClientTestStorage : IManagedMqttClientStorage
  365. {
  366. IList<ManagedMqttApplicationMessage> _messages;
  367. public Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync()
  368. {
  369. if (_messages == null)
  370. {
  371. _messages = new List<ManagedMqttApplicationMessage>();
  372. }
  373. return Task.FromResult(_messages);
  374. }
  375. public Task SaveQueuedMessagesAsync(IList<ManagedMqttApplicationMessage> messages)
  376. {
  377. _messages = messages;
  378. return Task.FromResult(0);
  379. }
  380. public int GetMessageCount()
  381. {
  382. return _messages.Count;
  383. }
  384. }
  385. }