您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

108 行
4.9 KiB

  1. using System.Collections.Concurrent;
  2. using System.Threading.Tasks;
  3. using Microsoft.VisualStudio.TestTools.UnitTesting;
  4. using MQTTnet.Packets;
  5. using MQTTnet.Protocol;
  6. using MQTTnet.Server;
  7. using MQTTnet.Tests.Mockups;
  8. namespace MQTTnet.Tests
  9. {
  10. [TestClass]
  11. public class MqttSubscriptionsManager_Tests
  12. {
  13. [TestMethod]
  14. public async Task MqttSubscriptionsManager_SubscribeSingleSuccess()
  15. {
  16. var s = new MqttClientSession("", new ConcurrentDictionary<object, object>(),
  17. new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions(), new TestLogger());
  18. var sm = new MqttClientSubscriptionsManager(s, new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions());
  19. var sp = new MqttSubscribePacket();
  20. sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
  21. await sm.SubscribeAsync(sp, new MqttConnectPacket());
  22. var result = sm.CheckSubscriptions("A/B/C", MqttQualityOfServiceLevel.AtMostOnce);
  23. Assert.IsTrue(result.IsSubscribed);
  24. Assert.AreEqual(result.QualityOfServiceLevel, MqttQualityOfServiceLevel.AtMostOnce);
  25. }
  26. [TestMethod]
  27. public async Task MqttSubscriptionsManager_SubscribeDifferentQoSSuccess()
  28. {
  29. var s = new MqttClientSession("", new ConcurrentDictionary<object, object>(),
  30. new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions(), new TestLogger());
  31. var sm = new MqttClientSubscriptionsManager(s, new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions());
  32. var sp = new MqttSubscribePacket();
  33. sp.TopicFilters.Add(new TopicFilter { Topic = "A/B/C", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  34. await sm.SubscribeAsync(sp, new MqttConnectPacket());
  35. var result = sm.CheckSubscriptions("A/B/C", MqttQualityOfServiceLevel.ExactlyOnce);
  36. Assert.IsTrue(result.IsSubscribed);
  37. Assert.AreEqual(result.QualityOfServiceLevel, MqttQualityOfServiceLevel.AtMostOnce);
  38. }
  39. [TestMethod]
  40. public async Task MqttSubscriptionsManager_SubscribeTwoTimesSuccess()
  41. {
  42. var s = new MqttClientSession("", new ConcurrentDictionary<object, object>(),
  43. new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions(), new TestLogger());
  44. var sm = new MqttClientSubscriptionsManager(s, new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions());
  45. var sp = new MqttSubscribePacket();
  46. sp.TopicFilters.Add(new TopicFilter { Topic = "#", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
  47. sp.TopicFilters.Add(new TopicFilter { Topic = "A/B/C", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce });
  48. await sm.SubscribeAsync(sp, new MqttConnectPacket());
  49. var result = sm.CheckSubscriptions("A/B/C", MqttQualityOfServiceLevel.ExactlyOnce);
  50. Assert.IsTrue(result.IsSubscribed);
  51. Assert.AreEqual(result.QualityOfServiceLevel, MqttQualityOfServiceLevel.AtLeastOnce);
  52. }
  53. [TestMethod]
  54. public async Task MqttSubscriptionsManager_SubscribeSingleNoSuccess()
  55. {
  56. var s = new MqttClientSession("", new ConcurrentDictionary<object, object>(),
  57. new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions(), new TestLogger());
  58. var sm = new MqttClientSubscriptionsManager(s, new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions());
  59. var sp = new MqttSubscribePacket();
  60. sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
  61. await sm.SubscribeAsync(sp, new MqttConnectPacket());
  62. Assert.IsFalse(sm.CheckSubscriptions("A/B/X", MqttQualityOfServiceLevel.AtMostOnce).IsSubscribed);
  63. }
  64. [TestMethod]
  65. public async Task MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle()
  66. {
  67. var s = new MqttClientSession("", new ConcurrentDictionary<object, object>(),
  68. new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions(), new TestLogger());
  69. var sm = new MqttClientSubscriptionsManager(s, new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions());
  70. var sp = new MqttSubscribePacket();
  71. sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
  72. await sm.SubscribeAsync(sp, new MqttConnectPacket());
  73. Assert.IsTrue(sm.CheckSubscriptions("A/B/C", MqttQualityOfServiceLevel.AtMostOnce).IsSubscribed);
  74. var up = new MqttUnsubscribePacket();
  75. up.TopicFilters.Add("A/B/C");
  76. await sm.UnsubscribeAsync(up);
  77. Assert.IsFalse(sm.CheckSubscriptions("A/B/C", MqttQualityOfServiceLevel.AtMostOnce).IsSubscribed);
  78. }
  79. }
  80. }