Browse Source

Add support for altering the TopicFilter in subscription interceptor.

release/3.x.x
Christian Kratky 5 years ago
parent
commit
1c004ead6d
4 changed files with 58 additions and 15 deletions
  1. +1
    -0
      Build/MQTTnet.nuspec
  2. +11
    -8
      Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs
  3. +1
    -1
      Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs
  4. +45
    -6
      Tests/MQTTnet.Core.Tests/Server_Tests.cs

+ 1
- 0
Build/MQTTnet.nuspec View File

@@ -22,6 +22,7 @@
* [Client] Converted option _DualMode_ into nullable boolean to preserve original value and avoid exceptions in IPv4 only networks (thanks to @lavaflo). * [Client] Converted option _DualMode_ into nullable boolean to preserve original value and avoid exceptions in IPv4 only networks (thanks to @lavaflo).
* [Server] Exposed _ClientCertificateRequired_ and _CheckCertificateRevocation_ at TLS options. * [Server] Exposed _ClientCertificateRequired_ and _CheckCertificateRevocation_ at TLS options.
* [Server] Exposed client certificate at client connection validator. * [Server] Exposed client certificate at client connection validator.
* [Server] The subscription interceptor now supports altering the entire topic filter.
</releaseNotes> </releaseNotes>
<copyright>Copyright Christian Kratky 2016-2019</copyright> <copyright>Copyright Christian Kratky 2016-2019</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>


+ 11
- 8
Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs View File

@@ -35,18 +35,21 @@ namespace MQTTnet.Server
CloseConnection = false CloseConnection = false
}; };


foreach (var topicFilter in subscribePacket.TopicFilters)
foreach (var originalTopicFilter in subscribePacket.TopicFilters)
{ {
var interceptorContext = await InterceptSubscribeAsync(topicFilter).ConfigureAwait(false);
if (!interceptorContext.AcceptSubscription)
var interceptorContext = await InterceptSubscribeAsync(originalTopicFilter).ConfigureAwait(false);

var finalTopicFilter = interceptorContext.TopicFilter;

if (finalTopicFilter == null || string.IsNullOrEmpty(finalTopicFilter.Topic) || !interceptorContext.AcceptSubscription)
{ {
result.ResponsePacket.ReturnCodes.Add(MqttSubscribeReturnCode.Failure); result.ResponsePacket.ReturnCodes.Add(MqttSubscribeReturnCode.Failure);
result.ResponsePacket.ReasonCodes.Add(MqttSubscribeReasonCode.UnspecifiedError); result.ResponsePacket.ReasonCodes.Add(MqttSubscribeReasonCode.UnspecifiedError);
} }
else else
{ {
result.ResponsePacket.ReturnCodes.Add(ConvertToSubscribeReturnCode(topicFilter.QualityOfServiceLevel));
result.ResponsePacket.ReasonCodes.Add(ConvertToSubscribeReasonCode(topicFilter.QualityOfServiceLevel));
result.ResponsePacket.ReturnCodes.Add(ConvertToSubscribeReturnCode(finalTopicFilter.QualityOfServiceLevel));
result.ResponsePacket.ReasonCodes.Add(ConvertToSubscribeReasonCode(finalTopicFilter.QualityOfServiceLevel));
} }


if (interceptorContext.CloseConnection) if (interceptorContext.CloseConnection)
@@ -54,14 +57,14 @@ namespace MQTTnet.Server
result.CloseConnection = true; result.CloseConnection = true;
} }


if (interceptorContext.AcceptSubscription)
if (interceptorContext.AcceptSubscription && !string.IsNullOrEmpty(finalTopicFilter?.Topic))
{ {
lock (_subscriptions) lock (_subscriptions)
{ {
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
_subscriptions[finalTopicFilter.Topic] = finalTopicFilter.QualityOfServiceLevel;
} }


await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientId, topicFilter).ConfigureAwait(false);
await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientId, finalTopicFilter).ConfigureAwait(false);
} }
} }




+ 1
- 1
Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs View File

@@ -12,7 +12,7 @@ namespace MQTTnet.Server


public string ClientId { get; } public string ClientId { get; }


public TopicFilter TopicFilter { get; }
public TopicFilter TopicFilter { get; set; }
public bool AcceptSubscription { get; set; } = true; public bool AcceptSubscription { get; set; } = true;




+ 45
- 6
Tests/MQTTnet.Core.Tests/Server_Tests.cs View File

@@ -121,6 +121,45 @@ namespace MQTTnet.Tests
} }
} }


[TestMethod]
public async Task Intercept_Subscription()
{
using (var testEnvironment = new TestEnvironment())
{
await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor(
c =>
{
// Set the topic to "a" regards what the client wants to subscribe.
c.TopicFilter.Topic = "a";
}));

var topicAReceived = false;
var topicBReceived = false;

var client = await testEnvironment.ConnectClientAsync();
client.UseApplicationMessageReceivedHandler(c =>
{
if (c.ApplicationMessage.Topic == "a")
{
topicAReceived = true;
}
else if (c.ApplicationMessage.Topic == "b")
{
topicBReceived = true;
}
});

await client.SubscribeAsync("b");

await client.PublishAsync("a");

await Task.Delay(500);

Assert.IsTrue(topicAReceived);
Assert.IsFalse(topicBReceived);
}
}

[TestMethod] [TestMethod]
public async Task Subscribe_Unsubscribe() public async Task Subscribe_Unsubscribe()
{ {
@@ -156,7 +195,7 @@ namespace MQTTnet.Tests
Assert.AreEqual(1, receivedMessagesCount); Assert.AreEqual(1, receivedMessagesCount);


var unsubscribeEventCalled = false; var unsubscribeEventCalled = false;
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e =>
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e =>
{ {
unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1"; unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1";
}); });
@@ -376,7 +415,7 @@ namespace MQTTnet.Tests
using (var testEnvironment = new TestEnvironment()) using (var testEnvironment = new TestEnvironment())
{ {
var server = await testEnvironment.StartServerAsync(); var server = await testEnvironment.StartServerAsync();
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1"));
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1"));


var client = await testEnvironment.ConnectClientAsync(); var client = await testEnvironment.ConnectClientAsync();
var receivedMessages = new List<MqttApplicationMessage>(); var receivedMessages = new List<MqttApplicationMessage>();
@@ -689,7 +728,7 @@ namespace MQTTnet.Tests
{ {
await testEnvironment.StartServerAsync( await testEnvironment.StartServerAsync(
new MqttServerOptionsBuilder().WithApplicationMessageInterceptor( new MqttServerOptionsBuilder().WithApplicationMessageInterceptor(
c => { c.ApplicationMessage = new MqttApplicationMessage {Topic = "new_topic" }; }));
c => { c.ApplicationMessage = new MqttApplicationMessage { Topic = "new_topic" }; }));


string receivedTopic = null; string receivedTopic = null;
var c1 = await testEnvironment.ConnectClientAsync(); var c1 = await testEnvironment.ConnectClientAsync();
@@ -866,7 +905,7 @@ namespace MQTTnet.Tests


var events = new List<string>(); var events = new List<string>();


server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ =>
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ =>
{ {
lock (events) lock (events)
{ {
@@ -1050,7 +1089,7 @@ namespace MQTTnet.Tests
var client2 = await testEnvironment.ConnectClientAsync(); var client2 = await testEnvironment.ConnectClientAsync();


var buffer = new StringBuilder(); var buffer = new StringBuilder();
client2.UseApplicationMessageReceivedHandler(c => client2.UseApplicationMessageReceivedHandler(c =>
{ {
lock (buffer) lock (buffer)
@@ -1082,7 +1121,7 @@ namespace MQTTnet.Tests


var clientStatus = await server.GetClientStatusAsync(); var clientStatus = await server.GetClientStatusAsync();
Assert.AreEqual(0, clientStatus.Count); Assert.AreEqual(0, clientStatus.Count);
var client2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("b")); var client2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("b"));
await client2.PublishAsync("x", "1"); await client2.PublishAsync("x", "1");
await client2.PublishAsync("x", "2"); await client2.PublishAsync("x", "2");


Loading…
Cancel
Save