You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

MqttServerTests.cs 3.8 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. using System.Collections.Generic;
  2. using System.Threading.Tasks;
  3. using Microsoft.VisualStudio.TestTools.UnitTesting;
  4. using MQTTnet.Core.Adapter;
  5. using MQTTnet.Core.Client;
  6. using MQTTnet.Core.Packets;
  7. using MQTTnet.Core.Protocol;
  8. using MQTTnet.Core.Server;
  9. namespace MQTTnet.Core.Tests
  10. {
  11. [TestClass]
  12. public class MqttServerTests
  13. {
  14. [TestMethod]
  15. public async Task MqttServer_PublishSimple_AtMostOnce()
  16. {
  17. await TestPublishAsync(
  18. "A/B/C",
  19. MqttQualityOfServiceLevel.AtMostOnce,
  20. "A/B/C",
  21. MqttQualityOfServiceLevel.AtMostOnce,
  22. 1);
  23. }
  24. [TestMethod]
  25. public async Task MqttServer_PublishSimple_AtLeastOnce()
  26. {
  27. await TestPublishAsync(
  28. "A/B/C",
  29. MqttQualityOfServiceLevel.AtLeastOnce,
  30. "A/B/C",
  31. MqttQualityOfServiceLevel.AtLeastOnce,
  32. 1);
  33. }
  34. [TestMethod]
  35. public async Task MqttServer_PublishSimple_ExactlyOnce()
  36. {
  37. await TestPublishAsync(
  38. "A/B/C",
  39. MqttQualityOfServiceLevel.ExactlyOnce,
  40. "A/B/C",
  41. MqttQualityOfServiceLevel.ExactlyOnce,
  42. 1);
  43. }
  44. [TestMethod]
  45. public async Task MqttServer_WillMessage()
  46. {
  47. var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { new TestMqttServerAdapter() });
  48. s.Start();
  49. var willMessage = new MqttApplicationMessage("My/last/will", new byte[0], MqttQualityOfServiceLevel.AtMostOnce, false);
  50. var c1 = ConnectTestClient("c1", null, s);
  51. var c2 = ConnectTestClient("c2", willMessage, s);
  52. var receivedMessagesCount = 0;
  53. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  54. await c1.SubscribeAsync(new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce));
  55. await c2.DisconnectAsync();
  56. await Task.Delay(1000);
  57. s.Stop();
  58. Assert.AreEqual(1, receivedMessagesCount);
  59. }
  60. private MqttClient ConnectTestClient(string clientId, MqttApplicationMessage willMessage, MqttServer server)
  61. {
  62. var adapterA = new TestMqttCommunicationAdapter();
  63. var adapterB = new TestMqttCommunicationAdapter();
  64. adapterA.Partner = adapterB;
  65. adapterB.Partner = adapterA;
  66. var client = new MqttClient(new MqttClientOptions(), adapterA);
  67. server.InjectClient(clientId, adapterB);
  68. client.ConnectAsync(willMessage).Wait();
  69. return client;
  70. }
  71. private async Task TestPublishAsync(
  72. string topic,
  73. MqttQualityOfServiceLevel qualityOfServiceLevel,
  74. string topicFilter,
  75. MqttQualityOfServiceLevel filterQualityOfServiceLevel,
  76. int expectedReceivedMessagesCount)
  77. {
  78. var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { new TestMqttServerAdapter() });
  79. s.Start();
  80. var c1 = ConnectTestClient("c1", null, s);
  81. var c2 = ConnectTestClient("c2", null, s);
  82. var receivedMessagesCount = 0;
  83. c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
  84. await c1.SubscribeAsync(new TopicFilter(topicFilter, filterQualityOfServiceLevel));
  85. await c2.PublishAsync(new MqttApplicationMessage(topic, new byte[0], qualityOfServiceLevel, false));
  86. await Task.Delay(500);
  87. await c1.Unsubscribe(topicFilter);
  88. await Task.Delay(500);
  89. s.Stop();
  90. Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
  91. }
  92. }
  93. }