You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

452 lines
18 KiB

  1. using Microsoft.VisualStudio.TestTools.UnitTesting;
  2. using MQTTnet.Client;
  3. using MQTTnet.Client.Connecting;
  4. using MQTTnet.Client.Options;
  5. using MQTTnet.Client.Receiving;
  6. using MQTTnet.Diagnostics;
  7. using MQTTnet.Extensions.ManagedClient;
  8. using MQTTnet.Server;
  9. using MQTTnet.Tests.Mockups;
  10. using System;
  11. using System.Collections.Generic;
  12. using System.Linq;
  13. using System.Threading;
  14. using System.Threading.Tasks;
  15. namespace MQTTnet.Tests
  16. {
  17. [TestClass]
  18. public class ManagedMqttClient_Tests
  19. {
  20. public TestContext TestContext { get; set; }
  21. [TestMethod]
  22. public async Task Drop_New_Messages_On_Full_Queue()
  23. {
  24. var factory = new MqttFactory();
  25. var managedClient = factory.CreateManagedMqttClient();
  26. try
  27. {
  28. var clientOptions = new ManagedMqttClientOptionsBuilder()
  29. .WithMaxPendingMessages(5)
  30. .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage);
  31. clientOptions.WithClientOptions(o => o.WithTcpServer("localhost"));
  32. await managedClient.StartAsync(clientOptions.Build());
  33. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "1" });
  34. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "2" });
  35. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "3" });
  36. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "4" });
  37. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "5" });
  38. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "6" });
  39. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "7" });
  40. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "8" });
  41. Assert.AreEqual(5, managedClient.PendingApplicationMessagesCount);
  42. }
  43. finally
  44. {
  45. await managedClient.StopAsync();
  46. }
  47. }
  48. [TestMethod]
  49. public async Task ManagedClients_Will_Message_Send()
  50. {
  51. using (var testEnvironment = new TestEnvironment(TestContext))
  52. {
  53. var receivedMessagesCount = 0;
  54. var factory = new MqttFactory();
  55. await testEnvironment.StartServerAsync();
  56. var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build();
  57. var clientOptions = new MqttClientOptionsBuilder()
  58. .WithTcpServer("localhost", testEnvironment.ServerPort)
  59. .WithWillMessage(willMessage);
  60. var dyingClient = testEnvironment.CreateClient();
  61. var dyingManagedClient = new ManagedMqttClient(dyingClient, testEnvironment.ClientLogger);
  62. await dyingManagedClient.StartAsync(new ManagedMqttClientOptionsBuilder()
  63. .WithClientOptions(clientOptions)
  64. .Build());
  65. var recievingClient = await testEnvironment.ConnectClientAsync();
  66. await recievingClient.SubscribeAsync("My/last/will");
  67. recievingClient.UseApplicationMessageReceivedHandler(context => Interlocked.Increment(ref receivedMessagesCount));
  68. dyingManagedClient.Dispose();
  69. await Task.Delay(1000);
  70. Assert.AreEqual(1, receivedMessagesCount);
  71. }
  72. }
  73. [TestMethod]
  74. public async Task Start_Stop()
  75. {
  76. using (var testEnvironment = new TestEnvironment(TestContext))
  77. {
  78. var factory = new MqttFactory();
  79. var server = await testEnvironment.StartServerAsync();
  80. var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger());
  81. var clientOptions = new MqttClientOptionsBuilder()
  82. .WithTcpServer("localhost", testEnvironment.ServerPort);
  83. var connected = GetConnectedTask(managedClient);
  84. await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder()
  85. .WithClientOptions(clientOptions)
  86. .Build());
  87. await connected;
  88. await managedClient.StopAsync();
  89. await Task.Delay(500);
  90. Assert.AreEqual(0, (await server.GetClientStatusAsync()).Count);
  91. }
  92. }
  93. [TestMethod]
  94. public async Task Storage_Queue_Drains()
  95. {
  96. using (var testEnvironment = new TestEnvironment(TestContext))
  97. {
  98. testEnvironment.IgnoreClientLogErrors = true;
  99. testEnvironment.IgnoreServerLogErrors = true;
  100. var factory = new MqttFactory();
  101. var server = await testEnvironment.StartServerAsync();
  102. var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger());
  103. var clientOptions = new MqttClientOptionsBuilder()
  104. .WithTcpServer("localhost", testEnvironment.ServerPort);
  105. var storage = new ManagedMqttClientTestStorage();
  106. var connected = GetConnectedTask(managedClient);
  107. await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder()
  108. .WithClientOptions(clientOptions)
  109. .WithStorage(storage)
  110. .WithAutoReconnectDelay(System.TimeSpan.FromSeconds(5))
  111. .Build());
  112. await connected;
  113. await testEnvironment.Server.StopAsync();
  114. await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "1" });
  115. //Message should have been added to the storage queue in PublishAsync,
  116. //and we are awaiting PublishAsync, so the message should already be
  117. //in storage at this point (i.e. no waiting).
  118. Assert.AreEqual(1, storage.GetMessageCount());
  119. connected = GetConnectedTask(managedClient);
  120. await testEnvironment.Server.StartAsync(new MqttServerOptionsBuilder()
  121. .WithDefaultEndpointPort(testEnvironment.ServerPort).Build());
  122. await connected;
  123. //Wait 500ms here so the client has time to publish the queued message
  124. await Task.Delay(500);
  125. Assert.AreEqual(0, storage.GetMessageCount());
  126. await managedClient.StopAsync();
  127. }
  128. }
  129. [TestMethod]
  130. public async Task Subscriptions_And_Unsubscriptions_Are_Made_And_Reestablished_At_Reconnect()
  131. {
  132. using (var testEnvironment = new TestEnvironment(TestContext))
  133. {
  134. var unmanagedClient = testEnvironment.CreateClient();
  135. var managedClient = await CreateManagedClientAsync(testEnvironment, unmanagedClient);
  136. var received = SetupReceivingOfMessages(managedClient, 2);
  137. // Perform some opposing subscriptions and unsubscriptions to verify
  138. // that these conflicting subscriptions are handled correctly
  139. await managedClient.SubscribeAsync("keptSubscribed");
  140. await managedClient.SubscribeAsync("subscribedThenUnsubscribed");
  141. await managedClient.UnsubscribeAsync("subscribedThenUnsubscribed");
  142. await managedClient.UnsubscribeAsync("unsubscribedThenSubscribed");
  143. await managedClient.SubscribeAsync("unsubscribedThenSubscribed");
  144. //wait a bit for the subscriptions to become established before the messages are published
  145. await Task.Delay(500);
  146. var sendingClient = await testEnvironment.ConnectClientAsync();
  147. async Task PublishMessages()
  148. {
  149. await sendingClient.PublishAsync("keptSubscribed", new byte[] { 1 });
  150. await sendingClient.PublishAsync("subscribedThenUnsubscribed", new byte[] { 1 });
  151. await sendingClient.PublishAsync("unsubscribedThenSubscribed", new byte[] { 1 });
  152. }
  153. await PublishMessages();
  154. async Task AssertMessagesReceived()
  155. {
  156. var messages = await received;
  157. Assert.AreEqual("keptSubscribed", messages[0].Topic);
  158. Assert.AreEqual("unsubscribedThenSubscribed", messages[1].Topic);
  159. }
  160. await AssertMessagesReceived();
  161. var connected = GetConnectedTask(managedClient);
  162. await unmanagedClient.DisconnectAsync();
  163. // the managed client has to reconnect by itself
  164. await connected;
  165. // wait a bit so that the managed client can reestablish the subscriptions
  166. await Task.Delay(500);
  167. received = SetupReceivingOfMessages(managedClient, 2);
  168. await PublishMessages();
  169. // and then the same subscriptions need to exist again
  170. await AssertMessagesReceived();
  171. }
  172. }
  173. // This case also serves as a regression test for the previous behavior which re-published
  174. // each and every existing subscriptions with every new subscription that was made
  175. // (causing performance problems and having the visible symptom of retained messages being received again)
  176. [TestMethod]
  177. public async Task Subscriptions_Subscribe_Only_New_Subscriptions()
  178. {
  179. using (var testEnvironment = new TestEnvironment(TestContext))
  180. {
  181. var managedClient = await CreateManagedClientAsync(testEnvironment);
  182. var sendingClient = await testEnvironment.ConnectClientAsync();
  183. await managedClient.SubscribeAsync("topic");
  184. //wait a bit for the subscription to become established
  185. await Task.Delay(500);
  186. await sendingClient.PublishAsync(new MqttApplicationMessage { Topic = "topic", Payload = new byte[] { 1 }, Retain = true });
  187. var messages = await SetupReceivingOfMessages(managedClient, 1);
  188. Assert.AreEqual(1, messages.Count);
  189. Assert.AreEqual("topic", messages.Single().Topic);
  190. await managedClient.SubscribeAsync("anotherTopic");
  191. await Task.Delay(500);
  192. // The subscription of the other topic must not trigger a re-subscription of the existing topic
  193. // (and thus renewed receiving of the retained message)
  194. Assert.AreEqual(1, messages.Count);
  195. }
  196. }
  197. // This case also serves as a regression test for the previous behavior
  198. // that subscriptions were only published at the ConnectionCheckInterval
  199. [TestMethod]
  200. public async Task Subscriptions_Are_Published_Immediately()
  201. {
  202. using (var testEnvironment = new TestEnvironment(TestContext))
  203. {
  204. // Use a long connection check interval to verify that the subscriptions
  205. // do not depend on the connection check interval anymore
  206. var connectionCheckInterval = TimeSpan.FromSeconds(10);
  207. var managedClient = await CreateManagedClientAsync(testEnvironment, null, connectionCheckInterval);
  208. var sendingClient = await testEnvironment.ConnectClientAsync();
  209. await sendingClient.PublishAsync(new MqttApplicationMessage { Topic = "topic", Payload = new byte[] { 1 }, Retain = true });
  210. await managedClient.SubscribeAsync("topic");
  211. var subscribeTime = DateTime.UtcNow;
  212. var messages = await SetupReceivingOfMessages(managedClient, 1);
  213. var elapsed = DateTime.UtcNow - subscribeTime;
  214. Assert.IsTrue(elapsed < TimeSpan.FromSeconds(1), $"Subscriptions must be activated immediately, this one took {elapsed}");
  215. Assert.AreEqual(messages.Single().Topic, "topic");
  216. }
  217. }
  218. [TestMethod]
  219. public async Task Subscriptions_Are_Cleared_At_Logout()
  220. {
  221. using (var testEnvironment = new TestEnvironment(TestContext))
  222. {
  223. await testEnvironment.StartServerAsync().ConfigureAwait(false);
  224. var sendingClient = await testEnvironment.ConnectClientAsync().ConfigureAwait(false);
  225. await sendingClient.PublishAsync(new MqttApplicationMessage
  226. {
  227. Topic = "topic",
  228. Payload = new byte[] { 1 },
  229. Retain = true
  230. });
  231. // Wait a bit for the retained message to be available
  232. await Task.Delay(500);
  233. await sendingClient.DisconnectAsync();
  234. // Now use the managed client and check if subscriptions get cleared properly.
  235. var clientOptions = new MqttClientOptionsBuilder()
  236. .WithTcpServer("localhost", testEnvironment.ServerPort);
  237. var managedOptions = new ManagedMqttClientOptionsBuilder()
  238. .WithClientOptions(clientOptions)
  239. .Build();
  240. var receivedManagedMessages = new List<MqttApplicationMessage>();
  241. var managedClient = new ManagedMqttClient(testEnvironment.CreateClient(), new MqttNetLogger());
  242. managedClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(c =>
  243. {
  244. receivedManagedMessages.Add(c.ApplicationMessage);
  245. });
  246. await managedClient.SubscribeAsync("topic");
  247. await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder()
  248. .WithClientOptions(clientOptions)
  249. .WithAutoReconnectDelay(TimeSpan.FromSeconds(1))
  250. .Build());
  251. await Task.Delay(500);
  252. Assert.AreEqual(1, receivedManagedMessages.Count);
  253. await managedClient.StopAsync();
  254. await Task.Delay(500);
  255. await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder()
  256. .WithClientOptions(clientOptions)
  257. .WithAutoReconnectDelay(TimeSpan.FromSeconds(1))
  258. .Build());
  259. await Task.Delay(1000);
  260. // After reconnect and then some delay, the retained message must not be received,
  261. // showing that the subscriptions were cleared
  262. Assert.AreEqual(1, receivedManagedMessages.Count);
  263. // Make sure that it gets received after subscribing again.
  264. await managedClient.SubscribeAsync("topic");
  265. await Task.Delay(500);
  266. Assert.AreEqual(2, receivedManagedMessages.Count);
  267. }
  268. }
  269. private async Task<ManagedMqttClient> CreateManagedClientAsync(
  270. TestEnvironment testEnvironment,
  271. IMqttClient underlyingClient = null,
  272. TimeSpan? connectionCheckInterval = null)
  273. {
  274. await testEnvironment.StartServerAsync();
  275. var clientOptions = new MqttClientOptionsBuilder()
  276. .WithTcpServer("localhost", testEnvironment.ServerPort);
  277. var managedOptions = new ManagedMqttClientOptionsBuilder()
  278. .WithClientOptions(clientOptions)
  279. .Build();
  280. // Use a short connection check interval so that subscription operations are performed quickly
  281. // in order to verify against a previous implementation that performed subscriptions only
  282. // at connection check intervals
  283. managedOptions.ConnectionCheckInterval = connectionCheckInterval ?? TimeSpan.FromSeconds(0.1);
  284. var managedClient =
  285. new ManagedMqttClient(underlyingClient ?? testEnvironment.CreateClient(), new MqttNetLogger());
  286. var connected = GetConnectedTask(managedClient);
  287. await managedClient.StartAsync(managedOptions);
  288. await connected;
  289. return managedClient;
  290. }
  291. /// <summary>
  292. /// Returns a task that will finish when the <paramref name="managedClient"/> has connected
  293. /// </summary>
  294. private Task GetConnectedTask(ManagedMqttClient managedClient)
  295. {
  296. TaskCompletionSource<bool> connected = new TaskCompletionSource<bool>();
  297. managedClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e =>
  298. {
  299. managedClient.ConnectedHandler = null;
  300. connected.SetResult(true);
  301. });
  302. return connected.Task;
  303. }
  304. /// <summary>
  305. /// Returns a task that will return the messages received on <paramref name="managedClient"/>
  306. /// when <paramref name="expectedNumberOfMessages"/> have been received
  307. /// </summary>
  308. private Task<List<MqttApplicationMessage>> SetupReceivingOfMessages(ManagedMqttClient managedClient, int expectedNumberOfMessages)
  309. {
  310. var receivedMessages = new List<MqttApplicationMessage>();
  311. var allReceived = new TaskCompletionSource<List<MqttApplicationMessage>>();
  312. managedClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(r =>
  313. {
  314. receivedMessages.Add(r.ApplicationMessage);
  315. if (receivedMessages.Count == expectedNumberOfMessages)
  316. {
  317. allReceived.SetResult(receivedMessages);
  318. }
  319. });
  320. return allReceived.Task;
  321. }
  322. }
  323. public class ManagedMqttClientTestStorage : IManagedMqttClientStorage
  324. {
  325. private IList<ManagedMqttApplicationMessage> _messages = null;
  326. public Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync()
  327. {
  328. if (_messages == null)
  329. {
  330. _messages = new List<ManagedMqttApplicationMessage>();
  331. }
  332. return Task.FromResult(_messages);
  333. }
  334. public Task SaveQueuedMessagesAsync(IList<ManagedMqttApplicationMessage> messages)
  335. {
  336. _messages = messages;
  337. return Task.FromResult(0);
  338. }
  339. public int GetMessageCount()
  340. {
  341. return _messages.Count;
  342. }
  343. }
  344. }