Explorar el Código

Fix QoS handling in server

release/3.x.x
Christian Kratky hace 7 años
padre
commit
5bcda12ca9
Se han modificado 4 ficheros con 42 adiciones y 21 borrados
  1. +11
    -0
      MQTTnet.Core/Server/CheckSubscriptionsResult.cs
  2. +3
    -1
      MQTTnet.Core/Server/MqttClientSession.cs
  3. +21
    -13
      MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs
  4. +7
    -7
      Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs

+ 11
- 0
MQTTnet.Core/Server/CheckSubscriptionsResult.cs Ver fichero

@@ -0,0 +1,11 @@
using MQTTnet.Core.Protocol;

namespace MQTTnet.Core.Server
{
public class CheckSubscriptionsResult
{
public bool IsSubscribed { get; set; }

public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; }
}
}

+ 3
- 1
MQTTnet.Core/Server/MqttClientSession.cs Ver fichero

@@ -110,11 +110,13 @@ namespace MQTTnet.Core.Server
{
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));

if (!_subscriptionsManager.IsSubscribed(publishPacket))
var result = _subscriptionsManager.CheckSubscriptions(publishPacket);
if (!result.IsSubscribed)
{
return;
}

publishPacket.QualityOfServiceLevel = result.QualityOfServiceLevel;
_pendingMessagesQueue.Enqueue(publishPacket);
}



+ 21
- 13
MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs Ver fichero

@@ -8,7 +8,7 @@ namespace MQTTnet.Core.Server
{
public sealed class MqttClientSubscriptionsManager
{
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscribedTopics = new Dictionary<string, MqttQualityOfServiceLevel>();
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
private readonly MqttServerOptions _options;

public MqttClientSubscriptionsManager(IOptions<MqttServerOptions> options)
@@ -23,7 +23,7 @@ namespace MQTTnet.Core.Server
var responsePacket = subscribePacket.CreateResponse<MqttSubAckPacket>();
var closeConnection = false;

lock (_subscribedTopics)
lock (_subscriptions)
{
foreach (var topicFilter in subscribePacket.TopicFilters)
{
@@ -38,7 +38,7 @@ namespace MQTTnet.Core.Server

if (interceptorContext.AcceptSubscription)
{
_subscribedTopics[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
}
}
}
@@ -54,40 +54,48 @@ namespace MQTTnet.Core.Server
{
if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket));

lock (_subscribedTopics)
lock (_subscriptions)
{
foreach (var topicFilter in unsubscribePacket.TopicFilters)
{
_subscribedTopics.Remove(topicFilter);
_subscriptions.Remove(topicFilter);
}
}

return unsubscribePacket.CreateResponse<MqttUnsubAckPacket>();
}

public bool IsSubscribed(MqttPublishPacket publishPacket)
public CheckSubscriptionsResult CheckSubscriptions(MqttPublishPacket publishPacket)
{
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));

lock (_subscribedTopics)
lock (_subscriptions)
{
foreach (var subscribedTopic in _subscribedTopics)
foreach (var subscription in _subscriptions)
{
if (publishPacket.QualityOfServiceLevel > subscribedTopic.Value)
if (!MqttTopicFilterComparer.IsMatch(publishPacket.Topic, subscription.Key))
{
continue;
}

if (!MqttTopicFilterComparer.IsMatch(publishPacket.Topic, subscribedTopic.Key))
var effectiveQos = subscription.Value;
if (publishPacket.QualityOfServiceLevel < effectiveQos)
{
continue;
effectiveQos = publishPacket.QualityOfServiceLevel;
}

return true;
return new CheckSubscriptionsResult
{
IsSubscribed = true,
QualityOfServiceLevel = effectiveQos
};
}
}

return false;
return new CheckSubscriptionsResult
{
IsSubscribed = false
};
}
}
}

+ 7
- 7
Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs Ver fichero

@@ -15,7 +15,7 @@ namespace MQTTnet.Core.Tests
var sm = new MqttClientSubscriptionsManager(new OptionsWrapper<MqttServerOptions>(new MqttServerOptions()));

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce));
sp.TopicFilters.Add(new TopicFilter("A/B/C"));

sm.Subscribe(sp, "");

@@ -25,7 +25,7 @@ namespace MQTTnet.Core.Tests
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
};

Assert.IsTrue(sm.IsSubscribed(pp));
Assert.IsTrue(sm.CheckSubscriptions(pp).IsSubscribed);
}

[TestMethod]
@@ -34,7 +34,7 @@ namespace MQTTnet.Core.Tests
var sm = new MqttClientSubscriptionsManager(new OptionsWrapper<MqttServerOptions>(new MqttServerOptions()));

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce));
sp.TopicFilters.Add(new TopicFilter("A/B/C"));

sm.Subscribe(sp, "");

@@ -44,7 +44,7 @@ namespace MQTTnet.Core.Tests
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
};

Assert.IsFalse(sm.IsSubscribed(pp));
Assert.IsFalse(sm.CheckSubscriptions(pp).IsSubscribed);
}

[TestMethod]
@@ -53,7 +53,7 @@ namespace MQTTnet.Core.Tests
var sm = new MqttClientSubscriptionsManager(new OptionsWrapper<MqttServerOptions>(new MqttServerOptions()));

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter("A/B/C", MqttQualityOfServiceLevel.AtMostOnce));
sp.TopicFilters.Add(new TopicFilter("A/B/C"));

sm.Subscribe(sp, "");

@@ -63,13 +63,13 @@ namespace MQTTnet.Core.Tests
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
};

Assert.IsTrue(sm.IsSubscribed(pp));
Assert.IsTrue(sm.CheckSubscriptions(pp).IsSubscribed);

var up = new MqttUnsubscribePacket();
up.TopicFilters.Add("A/B/C");
sm.Unsubscribe(up);

Assert.IsFalse(sm.IsSubscribed(pp));
Assert.IsFalse(sm.CheckSubscriptions(pp).IsSubscribed);
}
}
}

Cargando…
Cancelar
Guardar