From 3f080e04b9edca721b724362dd9b58741d23d3aa Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 22 Oct 2017 14:44:06 +0200 Subject: [PATCH] Add topic filter builder --- MQTTnet.Core/Server/MqttServerOptions.cs | 2 +- MQTTnet.Core/{Packets => }/TopicFilter.cs | 2 +- MQTTnet.Core/TopicFilterBuilder.cs | 51 +++++++++++++++++++++ README.md | 1 + Tests/MQTTnet.Core.Tests/MqttServerTests.cs | 49 +++++++++++++++++--- Tests/MQTTnet.TestApp.NetCore/ServerTest.cs | 2 + 6 files changed, 99 insertions(+), 8 deletions(-) rename MQTTnet.Core/{Packets => }/TopicFilter.cs (95%) create mode 100644 MQTTnet.Core/TopicFilterBuilder.cs diff --git a/MQTTnet.Core/Server/MqttServerOptions.cs b/MQTTnet.Core/Server/MqttServerOptions.cs index 27dc21c..4c6f7ad 100644 --- a/MQTTnet.Core/Server/MqttServerOptions.cs +++ b/MQTTnet.Core/Server/MqttServerOptions.cs @@ -16,7 +16,7 @@ namespace MQTTnet.Core.Server public Func ConnectionValidator { get; set; } - public Action ApplicationMessageInterceptor { get; set; } + public Func ApplicationMessageInterceptor { get; set; } public IMqttServerStorage Storage { get; set; } } diff --git a/MQTTnet.Core/Packets/TopicFilter.cs b/MQTTnet.Core/TopicFilter.cs similarity index 95% rename from MQTTnet.Core/Packets/TopicFilter.cs rename to MQTTnet.Core/TopicFilter.cs index 6786b1d..3e95afa 100644 --- a/MQTTnet.Core/Packets/TopicFilter.cs +++ b/MQTTnet.Core/TopicFilter.cs @@ -1,6 +1,6 @@ using MQTTnet.Core.Protocol; -namespace MQTTnet.Core.Packets +namespace MQTTnet.Core { public sealed class TopicFilter { diff --git a/MQTTnet.Core/TopicFilterBuilder.cs b/MQTTnet.Core/TopicFilterBuilder.cs new file mode 100644 index 0000000..90114e7 --- /dev/null +++ b/MQTTnet.Core/TopicFilterBuilder.cs @@ -0,0 +1,51 @@ +using MQTTnet.Core.Protocol; +using MQTTnet.Core.Exceptions; + +namespace MQTTnet.Core +{ + public class TopicFilterBuilder + { + private string _topic; + private MqttQualityOfServiceLevel _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; + + public TopicFilterBuilder WithTopic(string topic) + { + _topic = topic; + return this; + } + + public TopicFilterBuilder WithQualityOfServiceLevel(MqttQualityOfServiceLevel qualityOfServiceLevel) + { + _qualityOfServiceLevel = qualityOfServiceLevel; + return this; + } + + public TopicFilterBuilder WithAtLeastOnceQoS() + { + _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce; + return this; + } + + public TopicFilterBuilder WithAtMostOnceQoS() + { + _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; + return this; + } + + public TopicFilterBuilder WithExactlyOnceQoS() + { + _qualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce; + return this; + } + + public TopicFilter Build() + { + if (string.IsNullOrEmpty(_topic)) + { + throw new MqttProtocolViolationException("Topic is not set."); + } + + return new TopicFilter(_topic, _qualityOfServiceLevel); + } + } +} diff --git a/README.md b/README.md index bc7c13b..2bd607a 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ MQTTnet is a high performance .NET library for MQTT based communication. It prov * Included core _MqttClient_ with low level functionality * Also included _ManagedMqttClient_ which maintains the connection and subscriptions automatically. Also application messages are queued and re-scheduled for higher QoS levels automatically. * Rx support (via another project) +* Compatible with Microsoft Azure IoT Hub ### Server (broker) * List of connected clients available diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs index 63ef0bb..948497e 100644 --- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs +++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs @@ -1,4 +1,6 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; +using System.Text; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using MQTTnet.Core.Adapter; @@ -53,7 +55,7 @@ namespace MQTTnet.Core.Tests var s = new MqttServer(new MqttServerOptions(), new List { serverAdapter }, new MqttNetTrace()); await s.StartAsync(); - var willMessage = new MqttApplicationMessage("My/last/will", new byte[0], MqttQualityOfServiceLevel.AtMostOnce, false); + var willMessage = new MqttApplicationMessageBuilder().WithTopic("My/last/will").WithAtMostOnceQoS().Build(); var c1 = await serverAdapter.ConnectTestClient(s, "c1"); var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage); @@ -83,7 +85,7 @@ namespace MQTTnet.Core.Tests var receivedMessagesCount = 0; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - var message = new MqttApplicationMessage("a", new byte[0], MqttQualityOfServiceLevel.AtLeastOnce, false); + var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); await c2.PublishAsync(message); Assert.AreEqual(0, receivedMessagesCount); @@ -118,7 +120,7 @@ namespace MQTTnet.Core.Tests var receivedMessagesCount = 0; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - var message = new MqttApplicationMessage("a", new byte[0], MqttQualityOfServiceLevel.AtLeastOnce, false); + var message = new MqttApplicationMessageBuilder().WithTopic("a").WithAtLeastOnceQoS().Build(); await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce)); s.Publish(message); @@ -229,6 +231,41 @@ namespace MQTTnet.Core.Tests Assert.AreEqual(1, receivedMessagesCount); } + [TestMethod] + public async Task MqttServer_InterceptMessage() + { + var options = new MqttServerOptions + { + ApplicationMessageInterceptor = message => + { + message.Payload = Encoding.ASCII.GetBytes("extended"); + return message; + } + }; + + var serverAdapter = new TestMqttServerAdapter(); + var s = new MqttServer(options, new List { serverAdapter }, new MqttNetTrace()); + await s.StartAsync(); + + var c1 = await serverAdapter.ConnectTestClient(s, "c1"); + var c2 = await serverAdapter.ConnectTestClient(s, "c2"); + await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("test").Build()); + + var isIntercepted = false; + c2.ApplicationMessageReceived += (sender, args) => + { + isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0; + }; + + var m = new MqttApplicationMessageBuilder().WithTopic("test").Build(); + await c1.PublishAsync(m); + await c1.DisconnectAsync(); + + await Task.Delay(500); + + Assert.IsTrue(isIntercepted); + } + private class TestStorage : IMqttServerStorage { private IList _messages = new List(); @@ -262,8 +299,8 @@ namespace MQTTnet.Core.Tests var receivedMessagesCount = 0; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; - await c1.SubscribeAsync(new TopicFilter(topicFilter, filterQualityOfServiceLevel)); - await c2.PublishAsync(new MqttApplicationMessage(topic, new byte[0], qualityOfServiceLevel, false)); + await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); + await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); await Task.Delay(500); await c1.UnsubscribeAsync(topicFilter); diff --git a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs index 639920f..30d9596 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ServerTest.cs @@ -48,6 +48,8 @@ namespace MQTTnet.TestApp.NetCore // based payload with the timestamp is a suitable use case. message.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O")); } + + return message; }; //var certificate = new X509Certificate(@"C:\certs\test\test.cer", "");