diff --git a/MQTTnet.Core/Server/CheckSubscriptionsResult.cs b/MQTTnet.Core/Server/CheckSubscriptionsResult.cs new file mode 100644 index 0000000..625457f --- /dev/null +++ b/MQTTnet.Core/Server/CheckSubscriptionsResult.cs @@ -0,0 +1,11 @@ +using MQTTnet.Core.Protocol; + +namespace MQTTnet.Core.Server +{ + public class CheckSubscriptionsResult + { + public bool IsSubscribed { get; set; } + + public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } + } +} diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 42451dd..ad1416e 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -110,11 +110,13 @@ namespace MQTTnet.Core.Server { if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); - if (!_subscriptionsManager.IsSubscribed(publishPacket)) + var result = _subscriptionsManager.CheckSubscriptions(publishPacket); + if (!result.IsSubscribed) { return; } + publishPacket.QualityOfServiceLevel = result.QualityOfServiceLevel; _pendingMessagesQueue.Enqueue(publishPacket); } diff --git a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs index 1b0d6c4..5af3a2d 100644 --- a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs +++ b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs @@ -8,7 +8,7 @@ namespace MQTTnet.Core.Server { public sealed class MqttClientSubscriptionsManager { - private readonly Dictionary _subscribedTopics = new Dictionary(); + private readonly Dictionary _subscriptions = new Dictionary(); private readonly MqttServerOptions _options; public MqttClientSubscriptionsManager(IOptions options) @@ -23,7 +23,7 @@ namespace MQTTnet.Core.Server var responsePacket = subscribePacket.CreateResponse(); var closeConnection = false; - lock (_subscribedTopics) + lock (_subscriptions) { foreach (var topicFilter in subscribePacket.TopicFilters) { @@ -38,7 +38,7 @@ namespace MQTTnet.Core.Server if (interceptorContext.AcceptSubscription) { - _subscribedTopics[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; + _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; } } } @@ -54,40 +54,48 @@ namespace MQTTnet.Core.Server { if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket)); - lock (_subscribedTopics) + lock (_subscriptions) { foreach (var topicFilter in unsubscribePacket.TopicFilters) { - _subscribedTopics.Remove(topicFilter); + _subscriptions.Remove(topicFilter); } } return unsubscribePacket.CreateResponse(); } - public bool IsSubscribed(MqttPublishPacket publishPacket) + public CheckSubscriptionsResult CheckSubscriptions(MqttPublishPacket publishPacket) { if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); - lock (_subscribedTopics) + lock (_subscriptions) { - foreach (var subscribedTopic in _subscribedTopics) + foreach (var subscription in _subscriptions) { - if (publishPacket.QualityOfServiceLevel > subscribedTopic.Value) + if (!MqttTopicFilterComparer.IsMatch(publishPacket.Topic, subscription.Key)) { continue; } - if (!MqttTopicFilterComparer.IsMatch(publishPacket.Topic, subscribedTopic.Key)) + var effectiveQos = subscription.Value; + if (publishPacket.QualityOfServiceLevel < effectiveQos) { - continue; + effectiveQos = publishPacket.QualityOfServiceLevel; } - return true; + return new CheckSubscriptionsResult + { + IsSubscribed = true, + QualityOfServiceLevel = effectiveQos + }; } } - return false; + return new CheckSubscriptionsResult + { + IsSubscribed = false + }; } } } diff --git a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs index d46354c..44bad99 100644 --- a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs @@ -15,7 +15,7 @@ namespace MQTTnet.Core.Tests var sm = new MqttClientSubscriptionsManager(new OptionsWrapper(new MqttServerOptions())); var sp = new MqttSubscribePacket(); - sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce)); + sp.TopicFilters.Add(new TopicFilter("A/B/C")); sm.Subscribe(sp, ""); @@ -25,7 +25,7 @@ namespace MQTTnet.Core.Tests QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }; - Assert.IsTrue(sm.IsSubscribed(pp)); + Assert.IsTrue(sm.CheckSubscriptions(pp).IsSubscribed); } [TestMethod] @@ -34,7 +34,7 @@ namespace MQTTnet.Core.Tests var sm = new MqttClientSubscriptionsManager(new OptionsWrapper(new MqttServerOptions())); var sp = new MqttSubscribePacket(); - sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce)); + sp.TopicFilters.Add(new TopicFilter("A/B/C")); sm.Subscribe(sp, ""); @@ -44,7 +44,7 @@ namespace MQTTnet.Core.Tests QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }; - Assert.IsFalse(sm.IsSubscribed(pp)); + Assert.IsFalse(sm.CheckSubscriptions(pp).IsSubscribed); } [TestMethod] @@ -53,7 +53,7 @@ namespace MQTTnet.Core.Tests var sm = new MqttClientSubscriptionsManager(new OptionsWrapper(new MqttServerOptions())); var sp = new MqttSubscribePacket(); - sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce)); + sp.TopicFilters.Add(new TopicFilter("A/B/C")); sm.Subscribe(sp, ""); @@ -63,13 +63,13 @@ namespace MQTTnet.Core.Tests QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce }; - Assert.IsTrue(sm.IsSubscribed(pp)); + Assert.IsTrue(sm.CheckSubscriptions(pp).IsSubscribed); var up = new MqttUnsubscribePacket(); up.TopicFilters.Add("A/B/C"); sm.Unsubscribe(up); - Assert.IsFalse(sm.IsSubscribed(pp)); + Assert.IsFalse(sm.CheckSubscriptions(pp).IsSubscribed); } } }