diff --git a/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs b/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs index c025bdf..8696f83 100644 --- a/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs +++ b/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs @@ -5,6 +5,8 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Client; using MQTTnet.Exceptions; +using MQTTnet.Extensions.Rpc.Options; +using MQTTnet.Extensions.Rpc.Options.TopicGeneration; using MQTTnet.Protocol; namespace MQTTnet.Extensions.Rpc @@ -13,11 +15,18 @@ namespace MQTTnet.Extensions.Rpc { private readonly ConcurrentDictionary> _waitingCalls = new ConcurrentDictionary>(); private readonly IMqttClient _mqttClient; + private readonly IMqttRpcClientOptions _options; private readonly RpcAwareApplicationMessageReceivedHandler _applicationMessageReceivedHandler; - public MqttRpcClient(IMqttClient mqttClient) + [Obsolete("Use MqttRpcClient(IMqttClient mqttClient, IMqttRpcClientOptions options).")] + public MqttRpcClient(IMqttClient mqttClient) : this(mqttClient, new MqttRpcClientOptions()) + { + } + + public MqttRpcClient(IMqttClient mqttClient, IMqttRpcClientOptions options) { _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); + _options = options ?? throw new ArgumentNullException(nameof(options)); _applicationMessageReceivedHandler = new RpcAwareApplicationMessageReceivedHandler( mqttClient.ApplicationMessageReceivedHandler, @@ -55,8 +64,26 @@ namespace MQTTnet.Extensions.Rpc throw new InvalidOperationException("The application message received handler was modified."); } - var requestTopic = $"MQTTnet.RPC/{Guid.NewGuid():N}/{methodName}"; - var responseTopic = requestTopic + "/response"; + var topicNames = _options.TopicGenerationStrategy.CreateRpcTopics(new TopicGenerationContext + { + MethodName = methodName, + QualityOfServiceLevel = qualityOfServiceLevel, + MqttClient = _mqttClient, + Options = _options + }); + + var requestTopic = topicNames.RequestTopic; + var responseTopic = topicNames.ResponseTopic; + + if (string.IsNullOrWhiteSpace(requestTopic)) + { + throw new MqttProtocolViolationException("RPC request topic is empty."); + } + + if (string.IsNullOrWhiteSpace(responseTopic)) + { + throw new MqttProtocolViolationException("RPC response topic is empty."); + } var requestMessage = new MqttApplicationMessageBuilder() .WithTopic(requestTopic) diff --git a/Source/MQTTnet.Extensions.Rpc/Options/IMqttRpcClientOptions.cs b/Source/MQTTnet.Extensions.Rpc/Options/IMqttRpcClientOptions.cs new file mode 100644 index 0000000..6b7afa9 --- /dev/null +++ b/Source/MQTTnet.Extensions.Rpc/Options/IMqttRpcClientOptions.cs @@ -0,0 +1,9 @@ +using MQTTnet.Extensions.Rpc.Options.TopicGeneration; + +namespace MQTTnet.Extensions.Rpc.Options +{ + public interface IMqttRpcClientOptions + { + IMqttRpcClientTopicGenerationStrategy TopicGenerationStrategy { get; set; } + } +} \ No newline at end of file diff --git a/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptions.cs b/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptions.cs new file mode 100644 index 0000000..0dc4517 --- /dev/null +++ b/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptions.cs @@ -0,0 +1,9 @@ +using MQTTnet.Extensions.Rpc.Options.TopicGeneration; + +namespace MQTTnet.Extensions.Rpc.Options +{ + public class MqttRpcClientOptions : IMqttRpcClientOptions + { + public IMqttRpcClientTopicGenerationStrategy TopicGenerationStrategy { get; set; } = new DefaultMqttRpcClientTopicGenerationStrategy(); + } +} diff --git a/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptionsBuilder.cs b/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptionsBuilder.cs new file mode 100644 index 0000000..69277ac --- /dev/null +++ b/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptionsBuilder.cs @@ -0,0 +1,25 @@ +using MQTTnet.Extensions.Rpc.Options.TopicGeneration; +using System; + +namespace MQTTnet.Extensions.Rpc.Options +{ + public class MqttRpcClientOptionsBuilder + { + IMqttRpcClientTopicGenerationStrategy _topicGenerationStrategy = new DefaultMqttRpcClientTopicGenerationStrategy(); + + public MqttRpcClientOptionsBuilder WithTopicGenerationStrategy(IMqttRpcClientTopicGenerationStrategy value) + { + _topicGenerationStrategy = value ?? throw new ArgumentNullException(nameof(value)); + + return this; + } + + public IMqttRpcClientOptions Build() + { + return new MqttRpcClientOptions + { + TopicGenerationStrategy = _topicGenerationStrategy + }; + } + } +} diff --git a/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcTopicPair.cs b/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcTopicPair.cs new file mode 100644 index 0000000..12bccce --- /dev/null +++ b/Source/MQTTnet.Extensions.Rpc/Options/MqttRpcTopicPair.cs @@ -0,0 +1,9 @@ +namespace MQTTnet.Extensions.Rpc.Options +{ + public class MqttRpcTopicPair + { + public string RequestTopic { get; set; } + + public string ResponseTopic { get; set; } + } +} diff --git a/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/DefaultMqttRpcClientTopicGenerationStrategy.cs b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/DefaultMqttRpcClientTopicGenerationStrategy.cs new file mode 100644 index 0000000..d38aa0b --- /dev/null +++ b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/DefaultMqttRpcClientTopicGenerationStrategy.cs @@ -0,0 +1,20 @@ +using MQTTnet.Extensions.Rpc.Options.TopicGeneration; +using System; + +namespace MQTTnet.Extensions.Rpc.Options +{ + public class DefaultMqttRpcClientTopicGenerationStrategy : IMqttRpcClientTopicGenerationStrategy + { + public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context) + { + var requestTopic = $"MQTTnet.RPC/{Guid.NewGuid():N}/{context.MethodName}"; + var responseTopic = requestTopic + "/response"; + + return new MqttRpcTopicPair + { + RequestTopic = requestTopic, + ResponseTopic = responseTopic + }; + } + } +} diff --git a/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/IMqttRpcClientTopicGenerationStrategy.cs b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/IMqttRpcClientTopicGenerationStrategy.cs new file mode 100644 index 0000000..19c78d3 --- /dev/null +++ b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/IMqttRpcClientTopicGenerationStrategy.cs @@ -0,0 +1,7 @@ +namespace MQTTnet.Extensions.Rpc.Options.TopicGeneration +{ + public interface IMqttRpcClientTopicGenerationStrategy + { + MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context); + } +} diff --git a/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/TopicGenerationContext.cs b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/TopicGenerationContext.cs new file mode 100644 index 0000000..dc7263f --- /dev/null +++ b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/TopicGenerationContext.cs @@ -0,0 +1,16 @@ +using MQTTnet.Client; +using MQTTnet.Protocol; + +namespace MQTTnet.Extensions.Rpc.Options.TopicGeneration +{ + public class TopicGenerationContext + { + public string MethodName { get; set; } + + public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } + + public IMqttClient MqttClient { get; set; } + + public IMqttRpcClientOptions Options { get; set; } + } +} diff --git a/Tests/MQTTnet.Core.Tests/RPC_Tests.cs b/Tests/MQTTnet.Core.Tests/RPC_Tests.cs index 9f03172..a420697 100644 --- a/Tests/MQTTnet.Core.Tests/RPC_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/RPC_Tests.cs @@ -10,6 +10,8 @@ using MQTTnet.Extensions.Rpc; using MQTTnet.Protocol; using MQTTnet.Client.Options; using MQTTnet.Formatter; +using MQTTnet.Extensions.Rpc.Options; +using MQTTnet.Extensions.Rpc.Options.TopicGeneration; namespace MQTTnet.Tests { @@ -62,7 +64,22 @@ namespace MQTTnet.Tests var requestSender = await testEnvironment.ConnectClientAsync(); - var rpcClient = new MqttRpcClient(requestSender); + var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build()); + await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); + } + } + + [TestMethod] + [ExpectedException(typeof(MqttCommunicationTimedOutException))] + public async Task Execute_With_Custom_Topic_Names() + { + using (var testEnvironment = new TestEnvironment()) + { + await testEnvironment.StartServerAsync(); + + var requestSender = await testEnvironment.ConnectClientAsync(); + + var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().WithTopicGenerationStrategy(new TestTopicStrategy()) .Build()); await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); } } @@ -82,11 +99,23 @@ namespace MQTTnet.Tests var requestSender = await testEnvironment.ConnectClientAsync(); - var rpcClient = new MqttRpcClient(requestSender); + var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build()); var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", qosLevel); Assert.AreEqual("pong", Encoding.UTF8.GetString(response)); } } + + private class TestTopicStrategy : IMqttRpcClientTopicGenerationStrategy + { + public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context) + { + return new MqttRpcTopicPair + { + RequestTopic = "a", + ResponseTopic = "b" + }; + } + } } }