Browse Source

Add support for different strategies when generating RPC topics.

release/3.x.x
Christian Kratky 5 years ago
parent
commit
2e3cf54f11
9 changed files with 156 additions and 5 deletions
  1. +30
    -3
      Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs
  2. +9
    -0
      Source/MQTTnet.Extensions.Rpc/Options/IMqttRpcClientOptions.cs
  3. +9
    -0
      Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptions.cs
  4. +25
    -0
      Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptionsBuilder.cs
  5. +9
    -0
      Source/MQTTnet.Extensions.Rpc/Options/MqttRpcTopicPair.cs
  6. +20
    -0
      Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/DefaultMqttRpcClientTopicGenerationStrategy.cs
  7. +7
    -0
      Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/IMqttRpcClientTopicGenerationStrategy.cs
  8. +16
    -0
      Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/TopicGenerationContext.cs
  9. +31
    -2
      Tests/MQTTnet.Core.Tests/RPC_Tests.cs

+ 30
- 3
Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs View File

@@ -5,6 +5,8 @@ using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client;
using MQTTnet.Exceptions;
using MQTTnet.Extensions.Rpc.Options;
using MQTTnet.Extensions.Rpc.Options.TopicGeneration;
using MQTTnet.Protocol;

namespace MQTTnet.Extensions.Rpc
@@ -13,11 +15,18 @@ namespace MQTTnet.Extensions.Rpc
{
private readonly ConcurrentDictionary<string, TaskCompletionSource<byte[]>> _waitingCalls = new ConcurrentDictionary<string, TaskCompletionSource<byte[]>>();
private readonly IMqttClient _mqttClient;
private readonly IMqttRpcClientOptions _options;
private readonly RpcAwareApplicationMessageReceivedHandler _applicationMessageReceivedHandler;

public MqttRpcClient(IMqttClient mqttClient)
[Obsolete("Use MqttRpcClient(IMqttClient mqttClient, IMqttRpcClientOptions options).")]
public MqttRpcClient(IMqttClient mqttClient) : this(mqttClient, new MqttRpcClientOptions())
{
}

public MqttRpcClient(IMqttClient mqttClient, IMqttRpcClientOptions options)
{
_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
_options = options ?? throw new ArgumentNullException(nameof(options));

_applicationMessageReceivedHandler = new RpcAwareApplicationMessageReceivedHandler(
mqttClient.ApplicationMessageReceivedHandler,
@@ -55,8 +64,26 @@ namespace MQTTnet.Extensions.Rpc
throw new InvalidOperationException("The application message received handler was modified.");
}

var requestTopic = $"MQTTnet.RPC/{Guid.NewGuid():N}/{methodName}";
var responseTopic = requestTopic + "/response";
var topicNames = _options.TopicGenerationStrategy.CreateRpcTopics(new TopicGenerationContext
{
MethodName = methodName,
QualityOfServiceLevel = qualityOfServiceLevel,
MqttClient = _mqttClient,
Options = _options
});

var requestTopic = topicNames.RequestTopic;
var responseTopic = topicNames.ResponseTopic;

if (string.IsNullOrWhiteSpace(requestTopic))
{
throw new MqttProtocolViolationException("RPC request topic is empty.");
}

if (string.IsNullOrWhiteSpace(responseTopic))
{
throw new MqttProtocolViolationException("RPC response topic is empty.");
}

var requestMessage = new MqttApplicationMessageBuilder()
.WithTopic(requestTopic)


+ 9
- 0
Source/MQTTnet.Extensions.Rpc/Options/IMqttRpcClientOptions.cs View File

@@ -0,0 +1,9 @@
using MQTTnet.Extensions.Rpc.Options.TopicGeneration;

namespace MQTTnet.Extensions.Rpc.Options
{
public interface IMqttRpcClientOptions
{
IMqttRpcClientTopicGenerationStrategy TopicGenerationStrategy { get; set; }
}
}

+ 9
- 0
Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptions.cs View File

@@ -0,0 +1,9 @@
using MQTTnet.Extensions.Rpc.Options.TopicGeneration;

namespace MQTTnet.Extensions.Rpc.Options
{
public class MqttRpcClientOptions : IMqttRpcClientOptions
{
public IMqttRpcClientTopicGenerationStrategy TopicGenerationStrategy { get; set; } = new DefaultMqttRpcClientTopicGenerationStrategy();
}
}

+ 25
- 0
Source/MQTTnet.Extensions.Rpc/Options/MqttRpcClientOptionsBuilder.cs View File

@@ -0,0 +1,25 @@
using MQTTnet.Extensions.Rpc.Options.TopicGeneration;
using System;

namespace MQTTnet.Extensions.Rpc.Options
{
public class MqttRpcClientOptionsBuilder
{
IMqttRpcClientTopicGenerationStrategy _topicGenerationStrategy = new DefaultMqttRpcClientTopicGenerationStrategy();

public MqttRpcClientOptionsBuilder WithTopicGenerationStrategy(IMqttRpcClientTopicGenerationStrategy value)
{
_topicGenerationStrategy = value ?? throw new ArgumentNullException(nameof(value));

return this;
}

public IMqttRpcClientOptions Build()
{
return new MqttRpcClientOptions
{
TopicGenerationStrategy = _topicGenerationStrategy
};
}
}
}

+ 9
- 0
Source/MQTTnet.Extensions.Rpc/Options/MqttRpcTopicPair.cs View File

@@ -0,0 +1,9 @@
namespace MQTTnet.Extensions.Rpc.Options
{
public class MqttRpcTopicPair
{
public string RequestTopic { get; set; }

public string ResponseTopic { get; set; }
}
}

+ 20
- 0
Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/DefaultMqttRpcClientTopicGenerationStrategy.cs View File

@@ -0,0 +1,20 @@
using MQTTnet.Extensions.Rpc.Options.TopicGeneration;
using System;

namespace MQTTnet.Extensions.Rpc.Options
{
public class DefaultMqttRpcClientTopicGenerationStrategy : IMqttRpcClientTopicGenerationStrategy
{
public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context)
{
var requestTopic = $"MQTTnet.RPC/{Guid.NewGuid():N}/{context.MethodName}";
var responseTopic = requestTopic + "/response";

return new MqttRpcTopicPair
{
RequestTopic = requestTopic,
ResponseTopic = responseTopic
};
}
}
}

+ 7
- 0
Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/IMqttRpcClientTopicGenerationStrategy.cs View File

@@ -0,0 +1,7 @@
namespace MQTTnet.Extensions.Rpc.Options.TopicGeneration
{
public interface IMqttRpcClientTopicGenerationStrategy
{
MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context);
}
}

+ 16
- 0
Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/TopicGenerationContext.cs View File

@@ -0,0 +1,16 @@
using MQTTnet.Client;
using MQTTnet.Protocol;

namespace MQTTnet.Extensions.Rpc.Options.TopicGeneration
{
public class TopicGenerationContext
{
public string MethodName { get; set; }

public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; }

public IMqttClient MqttClient { get; set; }

public IMqttRpcClientOptions Options { get; set; }
}
}

+ 31
- 2
Tests/MQTTnet.Core.Tests/RPC_Tests.cs View File

@@ -10,6 +10,8 @@ using MQTTnet.Extensions.Rpc;
using MQTTnet.Protocol;
using MQTTnet.Client.Options;
using MQTTnet.Formatter;
using MQTTnet.Extensions.Rpc.Options;
using MQTTnet.Extensions.Rpc.Options.TopicGeneration;

namespace MQTTnet.Tests
{
@@ -62,7 +64,22 @@ namespace MQTTnet.Tests
var requestSender = await testEnvironment.ConnectClientAsync();

var rpcClient = new MqttRpcClient(requestSender);
var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build());
await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);
}
}

[TestMethod]
[ExpectedException(typeof(MqttCommunicationTimedOutException))]
public async Task Execute_With_Custom_Topic_Names()
{
using (var testEnvironment = new TestEnvironment())
{
await testEnvironment.StartServerAsync();

var requestSender = await testEnvironment.ConnectClientAsync();

var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().WithTopicGenerationStrategy(new TestTopicStrategy()) .Build());
await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);
}
}
@@ -82,11 +99,23 @@ namespace MQTTnet.Tests

var requestSender = await testEnvironment.ConnectClientAsync();

var rpcClient = new MqttRpcClient(requestSender);
var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build());
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", qosLevel);

Assert.AreEqual("pong", Encoding.UTF8.GetString(response));
}
}

private class TestTopicStrategy : IMqttRpcClientTopicGenerationStrategy
{
public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context)
{
return new MqttRpcTopicPair
{
RequestTopic = "a",
ResponseTopic = "b"
};
}
}
}
}

Loading…
Cancel
Save