Browse Source

MqttRpcClient now adds ResponseTopic to MqttApplicationMessage when using MQTT v5 (#1295)

* MqttRpcClient now adds ResponseTopic to MqttApplicationMessage when using MQTT v5

* No need for conditional protocol check (ResponseTopic property won't be added by PacketFormatter in <MQTT v5)

* Cleanup: Remove using
release/3.x.x
Michi 3 years ago
committed by GitHub
parent
commit
25ddfc83a8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 100 additions and 6 deletions
  1. +3
    -2
      Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs
  2. +97
    -4
      Tests/MQTTnet.Core.Tests/RPC_Tests.cs

+ 3
- 2
Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs View File

@@ -1,4 +1,4 @@
using MQTTnet.Client;
using MQTTnet.Client;
using MQTTnet.Exceptions;
using MQTTnet.Extensions.Rpc.Options;
using MQTTnet.Extensions.Rpc.Options.TopicGeneration;
@@ -90,6 +90,7 @@ namespace MQTTnet.Extensions.Rpc
.WithTopic(requestTopic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qualityOfServiceLevel)
.WithResponseTopic(responseTopic)
.Build();

try
@@ -99,7 +100,7 @@ namespace MQTTnet.Extensions.Rpc
#else
var awaitable = new TaskCompletionSource<byte[]>(TaskCreationOptions.RunContinuationsAsynchronously);
#endif
if (!_waitingCalls.TryAdd(responseTopic, awaitable))
{
throw new InvalidOperationException();


+ 97
- 4
Tests/MQTTnet.Core.Tests/RPC_Tests.cs View File

@@ -56,6 +56,25 @@ namespace MQTTnet.Tests
return Execute_Success(MqttQualityOfServiceLevel.ExactlyOnce, MqttProtocolVersion.V500);
}

[TestMethod]
public Task Execute_Success_With_QoS_0_MQTT_V5_Use_ResponseTopic()
{
return Execute_Success_MQTT_V5(MqttQualityOfServiceLevel.AtMostOnce);
}

[TestMethod]
public Task Execute_Success_With_QoS_1_MQTT_V5_Use_ResponseTopic()
{
return Execute_Success_MQTT_V5(MqttQualityOfServiceLevel.AtLeastOnce);
}

[TestMethod]
public Task Execute_Success_With_QoS_2_MQTT_V5_Use_ResponseTopic()
{
return Execute_Success_MQTT_V5(MqttQualityOfServiceLevel.ExactlyOnce);
}


[TestMethod]
[ExpectedException(typeof(MqttCommunicationTimedOutException))]
public async Task Execute_Timeout()
@@ -63,7 +82,7 @@ namespace MQTTnet.Tests
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServer();
var requestSender = await testEnvironment.ConnectClient();

var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build());
@@ -81,8 +100,8 @@ namespace MQTTnet.Tests

var requestSender = await testEnvironment.ConnectClient();

var rpcClient = await testEnvironment.ConnectRpcClient(new MqttRpcClientOptionsBuilder().WithTopicGenerationStrategy(new TestTopicStrategy()) .Build());
var rpcClient = await testEnvironment.ConnectRpcClient(new MqttRpcClientOptionsBuilder().WithTopicGenerationStrategy(new TestTopicStrategy()).Build());
await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);
}
}
@@ -105,12 +124,86 @@ namespace MQTTnet.Tests
using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build()))
{
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", qosLevel);

Assert.AreEqual("pong", Encoding.UTF8.GetString(response));
}
}
}

async Task Execute_Success_MQTT_V5(MqttQualityOfServiceLevel qosLevel)
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServer();
var responseSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500));
await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping", qosLevel);

responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e =>
{
await responseSender.PublishAsync(e.ApplicationMessage.ResponseTopic, "pong");
});

var requestSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500));

using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build()))
{
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", qosLevel);

Assert.AreEqual("pong", Encoding.UTF8.GetString(response));
}
}
}

[TestMethod]
public async Task Execute_Success_MQTT_V5_Mixed_Clients()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServer();
var responseSender = await testEnvironment.ConnectClient();
await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping", MqttQualityOfServiceLevel.AtMostOnce);

responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e =>
{
Assert.IsNull(e.ApplicationMessage.ResponseTopic);
await responseSender.PublishAsync(e.ApplicationMessage.Topic + "/response", "pong");
});

var requestSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500));

using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build()))
{
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);

Assert.AreEqual("pong", Encoding.UTF8.GetString(response));
}
}
}

[TestMethod]
[ExpectedException(typeof(MqttCommunicationTimedOutException))]
public async Task Execute_Timeout_MQTT_V5_Mixed_Clients()
{
using (var testEnvironment = new TestEnvironment(TestContext))
{
await testEnvironment.StartServer();
var responseSender = await testEnvironment.ConnectClient();
await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping", MqttQualityOfServiceLevel.AtMostOnce);

responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e =>
{
Assert.IsNull(e.ApplicationMessage.ResponseTopic);
});

var requestSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500));

using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build()))
{
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);
}
}
}

class TestTopicStrategy : IMqttRpcClientTopicGenerationStrategy
{
public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context)


Loading…
Cancel
Save