From 1c004ead6d861c4069105493b05a1da91355e26e Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Fri, 7 Jun 2019 20:40:24 +0200 Subject: [PATCH] Add support for altering the TopicFilter in subscription interceptor. --- Build/MQTTnet.nuspec | 1 + .../Server/MqttClientSubscriptionsManager.cs | 19 ++++--- .../MqttSubscriptionInterceptorContext.cs | 2 +- Tests/MQTTnet.Core.Tests/Server_Tests.cs | 51 ++++++++++++++++--- 4 files changed, 58 insertions(+), 15 deletions(-) diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 1c03b6d..f2bb0c1 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -22,6 +22,7 @@ * [Client] Converted option _DualMode_ into nullable boolean to preserve original value and avoid exceptions in IPv4 only networks (thanks to @lavaflo). * [Server] Exposed _ClientCertificateRequired_ and _CheckCertificateRevocation_ at TLS options. * [Server] Exposed client certificate at client connection validator. +* [Server] The subscription interceptor now supports altering the entire topic filter. Copyright Christian Kratky 2016-2019 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs index 904474f..e2024a6 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs @@ -35,18 +35,21 @@ namespace MQTTnet.Server CloseConnection = false }; - foreach (var topicFilter in subscribePacket.TopicFilters) + foreach (var originalTopicFilter in subscribePacket.TopicFilters) { - var interceptorContext = await InterceptSubscribeAsync(topicFilter).ConfigureAwait(false); - if (!interceptorContext.AcceptSubscription) + var interceptorContext = await InterceptSubscribeAsync(originalTopicFilter).ConfigureAwait(false); + + var finalTopicFilter = interceptorContext.TopicFilter; + + if (finalTopicFilter == null || string.IsNullOrEmpty(finalTopicFilter.Topic) || !interceptorContext.AcceptSubscription) { result.ResponsePacket.ReturnCodes.Add(MqttSubscribeReturnCode.Failure); result.ResponsePacket.ReasonCodes.Add(MqttSubscribeReasonCode.UnspecifiedError); } else { - result.ResponsePacket.ReturnCodes.Add(ConvertToSubscribeReturnCode(topicFilter.QualityOfServiceLevel)); - result.ResponsePacket.ReasonCodes.Add(ConvertToSubscribeReasonCode(topicFilter.QualityOfServiceLevel)); + result.ResponsePacket.ReturnCodes.Add(ConvertToSubscribeReturnCode(finalTopicFilter.QualityOfServiceLevel)); + result.ResponsePacket.ReasonCodes.Add(ConvertToSubscribeReasonCode(finalTopicFilter.QualityOfServiceLevel)); } if (interceptorContext.CloseConnection) @@ -54,14 +57,14 @@ namespace MQTTnet.Server result.CloseConnection = true; } - if (interceptorContext.AcceptSubscription) + if (interceptorContext.AcceptSubscription && !string.IsNullOrEmpty(finalTopicFilter?.Topic)) { lock (_subscriptions) { - _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; + _subscriptions[finalTopicFilter.Topic] = finalTopicFilter.QualityOfServiceLevel; } - await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientId, topicFilter).ConfigureAwait(false); + await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientId, finalTopicFilter).ConfigureAwait(false); } } diff --git a/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs b/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs index 07b94bb..ca98c95 100644 --- a/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs +++ b/Source/MQTTnet/Server/MqttSubscriptionInterceptorContext.cs @@ -12,7 +12,7 @@ namespace MQTTnet.Server public string ClientId { get; } - public TopicFilter TopicFilter { get; } + public TopicFilter TopicFilter { get; set; } public bool AcceptSubscription { get; set; } = true; diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs index 5b34581..31f0847 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs @@ -121,6 +121,45 @@ namespace MQTTnet.Tests } } + [TestMethod] + public async Task Intercept_Subscription() + { + using (var testEnvironment = new TestEnvironment()) + { + await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithSubscriptionInterceptor( + c => + { + // Set the topic to "a" regards what the client wants to subscribe. + c.TopicFilter.Topic = "a"; + })); + + var topicAReceived = false; + var topicBReceived = false; + + var client = await testEnvironment.ConnectClientAsync(); + client.UseApplicationMessageReceivedHandler(c => + { + if (c.ApplicationMessage.Topic == "a") + { + topicAReceived = true; + } + else if (c.ApplicationMessage.Topic == "b") + { + topicBReceived = true; + } + }); + + await client.SubscribeAsync("b"); + + await client.PublishAsync("a"); + + await Task.Delay(500); + + Assert.IsTrue(topicAReceived); + Assert.IsFalse(topicBReceived); + } + } + [TestMethod] public async Task Subscribe_Unsubscribe() { @@ -156,7 +195,7 @@ namespace MQTTnet.Tests Assert.AreEqual(1, receivedMessagesCount); var unsubscribeEventCalled = false; - server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e => + server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(e => { unsubscribeEventCalled = e.TopicFilter == "a" && e.ClientId == "c1"; }); @@ -376,7 +415,7 @@ namespace MQTTnet.Tests using (var testEnvironment = new TestEnvironment()) { var server = await testEnvironment.StartServerAsync(); - server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1")); + server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(e => server.SubscribeAsync(e.ClientId, "topic1")); var client = await testEnvironment.ConnectClientAsync(); var receivedMessages = new List(); @@ -689,7 +728,7 @@ namespace MQTTnet.Tests { await testEnvironment.StartServerAsync( new MqttServerOptionsBuilder().WithApplicationMessageInterceptor( - c => { c.ApplicationMessage = new MqttApplicationMessage {Topic = "new_topic" }; })); + c => { c.ApplicationMessage = new MqttApplicationMessage { Topic = "new_topic" }; })); string receivedTopic = null; var c1 = await testEnvironment.ConnectClientAsync(); @@ -866,7 +905,7 @@ namespace MQTTnet.Tests var events = new List(); - server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => + server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => { lock (events) { @@ -1050,7 +1089,7 @@ namespace MQTTnet.Tests var client2 = await testEnvironment.ConnectClientAsync(); var buffer = new StringBuilder(); - + client2.UseApplicationMessageReceivedHandler(c => { lock (buffer) @@ -1082,7 +1121,7 @@ namespace MQTTnet.Tests var clientStatus = await server.GetClientStatusAsync(); Assert.AreEqual(0, clientStatus.Count); - + var client2 = await testEnvironment.ConnectClientAsync(new MqttClientOptionsBuilder().WithClientId("b")); await client2.PublishAsync("x", "1"); await client2.PublishAsync("x", "2");