@@ -1,4 +1,4 @@ | |||
using MQTTnet.Client; | |||
using MQTTnet.Client; | |||
using MQTTnet.Exceptions; | |||
using MQTTnet.Extensions.Rpc.Options; | |||
using MQTTnet.Extensions.Rpc.Options.TopicGeneration; | |||
@@ -90,6 +90,7 @@ namespace MQTTnet.Extensions.Rpc | |||
.WithTopic(requestTopic) | |||
.WithPayload(payload) | |||
.WithQualityOfServiceLevel(qualityOfServiceLevel) | |||
.WithResponseTopic(responseTopic) | |||
.Build(); | |||
try | |||
@@ -99,7 +100,7 @@ namespace MQTTnet.Extensions.Rpc | |||
#else | |||
var awaitable = new TaskCompletionSource<byte[]>(TaskCreationOptions.RunContinuationsAsynchronously); | |||
#endif | |||
if (!_waitingCalls.TryAdd(responseTopic, awaitable)) | |||
{ | |||
throw new InvalidOperationException(); | |||
@@ -8,7 +8,7 @@ namespace MQTTnet.Exceptions | |||
{ | |||
} | |||
public MqttCommunicationTimedOutException(Exception innerException) : base(innerException) | |||
public MqttCommunicationTimedOutException(Exception innerException) : base("The operation has timed out.", innerException) | |||
{ | |||
} | |||
} | |||
@@ -8,11 +8,6 @@ namespace MQTTnet.Server.Internal | |||
{ | |||
readonly DateTime _createdTimestamp = DateTime.UtcNow; | |||
/// <summary> | |||
/// Session should persist if CleanSession was set to false (Mqtt3) or if SessionExpiryInterval != 0 (Mqtt5) | |||
/// </summary> | |||
readonly bool _isPersistent; | |||
public MqttClientSession( | |||
string clientId, | |||
IDictionary<object, object> items, | |||
@@ -24,17 +19,19 @@ namespace MQTTnet.Server.Internal | |||
{ | |||
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId)); | |||
Items = items ?? throw new ArgumentNullException(nameof(items)); | |||
IsPersistent = isPersistent; | |||
SubscriptionsManager = new MqttClientSubscriptionsManager(this, serverOptions, eventDispatcher, retainedMessagesManager); | |||
ApplicationMessagesQueue = new MqttClientSessionApplicationMessagesQueue(serverOptions); | |||
_isPersistent = isPersistent; | |||
} | |||
public string ClientId { get; } | |||
public bool IsCleanSession { get; set; } = true; | |||
public bool IsPersistent => _isPersistent; | |||
/// <summary> | |||
/// Session should persist if CleanSession was set to false (Mqtt3) or if SessionExpiryInterval != 0 (Mqtt5) | |||
/// </summary> | |||
public bool IsPersistent { get; set; } | |||
public MqttApplicationMessage WillMessage { get; set; } | |||
@@ -507,7 +507,7 @@ namespace MQTTnet.Server.Internal | |||
// in each time it connects. | |||
// Persist if SessionExpiryInterval != 0, but may start with a clean session | |||
sessionShouldPersist = context.SessionExpiryInterval != 0; | |||
sessionShouldPersist = context.SessionExpiryInterval.GetValueOrDefault() != 0; | |||
} | |||
else | |||
{ | |||
@@ -540,6 +540,8 @@ namespace MQTTnet.Server.Internal | |||
else | |||
{ | |||
_logger.Verbose("Reusing existing session of client '{0}'.", connectPacket.ClientId); | |||
// Session persistence could change for MQTT 5 clients that reconnect with different SessionExpiryInterval | |||
session.IsPersistent = sessionShouldPersist; | |||
connAckPacket.IsSessionPresent = true; | |||
} | |||
} | |||
@@ -51,15 +51,8 @@ namespace MQTTnet.Tests.MQTTv5 | |||
const string ClientId = "Client1"; | |||
// Create client without clean session and long session expiry interval | |||
var client1 = await testEnvironment.ConnectClient(o => o | |||
.WithProtocolVersion(MqttProtocolVersion.V500) | |||
.WithTcpServer("127.0.0.1", testEnvironment.ServerPort) | |||
.WithSessionExpiryInterval(9999) | |||
.WithCleanSession(false) | |||
.WithClientId(ClientId) | |||
.Build() | |||
); | |||
var options1 = CreateClientOptions(testEnvironment, ClientId, false, 9999); | |||
var client1 = await testEnvironment.ConnectClient(options1); | |||
// Disconnect; empty session should remain on server | |||
@@ -72,23 +65,121 @@ namespace MQTTnet.Tests.MQTTv5 | |||
// Reconnect the same client ID to existing session | |||
var client2 = testEnvironment.CreateClient(); | |||
var options = testEnvironment.Factory.CreateClientOptionsBuilder() | |||
.WithProtocolVersion(MqttProtocolVersion.V500) | |||
.WithTcpServer("127.0.0.1", testEnvironment.ServerPort) | |||
.WithSessionExpiryInterval(9999) | |||
.WithCleanSession(false) | |||
.WithClientId(ClientId) | |||
.Build(); | |||
var options2 = CreateClientOptions(testEnvironment, ClientId, false, 9999); | |||
var result = await client2.ConnectAsync(options2).ConfigureAwait(false); | |||
await client2.DisconnectAsync(); | |||
// Session should be present | |||
Assert.IsTrue(result.IsSessionPresent, "Session not present"); | |||
} | |||
} | |||
[TestMethod] | |||
public async Task Connect_with_Undefined_SessionExpiryInterval() | |||
{ | |||
using (var testEnvironment = new TestEnvironment(TestContext)) | |||
{ | |||
// Create server with persistent sessions enabled | |||
await testEnvironment.StartServer(o => o.WithPersistentSessions()); | |||
const string ClientId = "Client1"; | |||
// Create client without clean session and NO session expiry interval, | |||
// that means, the session should not persist | |||
var options1 = CreateClientOptions(testEnvironment, ClientId, false, null); | |||
var client1 = await testEnvironment.ConnectClient(options1); | |||
// Disconnect; no session should remain on server because the session expiry interval was undefined | |||
await client1.DisconnectAsync(); | |||
// Simulate some time delay between connections | |||
await Task.Delay(1000); | |||
// Reconnect the same client ID to existing session | |||
var client2 = testEnvironment.CreateClient(); | |||
var options2 = CreateClientOptions(testEnvironment, ClientId, false, 9999); | |||
var result = await client2.ConnectAsync(options2).ConfigureAwait(false); | |||
await client2.DisconnectAsync(); | |||
// Session should not be present | |||
Assert.IsTrue(!result.IsSessionPresent, "Session is present when it should not"); | |||
} | |||
} | |||
[TestMethod] | |||
public async Task Reconnect_with_different_SessionExpiryInterval() | |||
{ | |||
using (var testEnvironment = new TestEnvironment(TestContext)) | |||
{ | |||
// Create server with persistent sessions enabled | |||
await testEnvironment.StartServer(o => o.WithPersistentSessions()); | |||
const string ClientId = "Client1"; | |||
// Create client with clean session and session expiry interval > 0 | |||
var options = CreateClientOptions(testEnvironment, ClientId, true, 9999); | |||
var client1 = await testEnvironment.ConnectClient(options); | |||
// Disconnect; session should remain on server | |||
await client1.DisconnectAsync(); | |||
// Simulate some time delay between connections | |||
await Task.Delay(1000); | |||
var result = await client2.ConnectAsync(options).ConfigureAwait(false); | |||
// Reconnect the same client ID to the existing session but leave session expiry interval undefined this time. | |||
// Session should be present because the client1 connection had SessionExpiryInterval > 0 | |||
var client2 = testEnvironment.CreateClient(); | |||
var options2 = CreateClientOptions(testEnvironment, ClientId, false, null); | |||
var result2 = await client2.ConnectAsync(options2).ConfigureAwait(false); | |||
await client2.DisconnectAsync(); | |||
Assert.IsTrue(result2.IsSessionPresent, "Session is not present when it should"); | |||
// Simulate some time delay between connections | |||
await Task.Delay(1000); | |||
// Reconnect the same client ID. | |||
// No session should be present because the previous session expiry interval was undefined for the client2 connection | |||
var client3 = testEnvironment.CreateClient(); | |||
var options3 = CreateClientOptions(testEnvironment, ClientId, false, null); | |||
var result3 = await client2.ConnectAsync(options3).ConfigureAwait(false); | |||
await client3.DisconnectAsync(); | |||
// Session should be present | |||
Assert.IsTrue(result.IsSessionPresent, "Session not present"); | |||
Assert.IsTrue(!result3.IsSessionPresent, "Session is present when it should not"); | |||
} | |||
} | |||
IMqttClientOptions CreateClientOptions(TestEnvironment testEnvironment, string clientId, bool cleanSession, uint? sessionExpiryInterval) | |||
{ | |||
return testEnvironment.Factory.CreateClientOptionsBuilder() | |||
.WithProtocolVersion(MqttProtocolVersion.V500) | |||
.WithTcpServer("127.0.0.1", testEnvironment.ServerPort) | |||
.WithSessionExpiryInterval(sessionExpiryInterval) | |||
.WithCleanSession(cleanSession) | |||
.WithClientId(clientId) | |||
.Build(); | |||
} | |||
} | |||
} |
@@ -56,6 +56,25 @@ namespace MQTTnet.Tests | |||
return Execute_Success(MqttQualityOfServiceLevel.ExactlyOnce, MqttProtocolVersion.V500); | |||
} | |||
[TestMethod] | |||
public Task Execute_Success_With_QoS_0_MQTT_V5_Use_ResponseTopic() | |||
{ | |||
return Execute_Success_MQTT_V5(MqttQualityOfServiceLevel.AtMostOnce); | |||
} | |||
[TestMethod] | |||
public Task Execute_Success_With_QoS_1_MQTT_V5_Use_ResponseTopic() | |||
{ | |||
return Execute_Success_MQTT_V5(MqttQualityOfServiceLevel.AtLeastOnce); | |||
} | |||
[TestMethod] | |||
public Task Execute_Success_With_QoS_2_MQTT_V5_Use_ResponseTopic() | |||
{ | |||
return Execute_Success_MQTT_V5(MqttQualityOfServiceLevel.ExactlyOnce); | |||
} | |||
[TestMethod] | |||
[ExpectedException(typeof(MqttCommunicationTimedOutException))] | |||
public async Task Execute_Timeout() | |||
@@ -63,7 +82,7 @@ namespace MQTTnet.Tests | |||
using (var testEnvironment = new TestEnvironment(TestContext)) | |||
{ | |||
await testEnvironment.StartServer(); | |||
var requestSender = await testEnvironment.ConnectClient(); | |||
var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build()); | |||
@@ -81,8 +100,8 @@ namespace MQTTnet.Tests | |||
var requestSender = await testEnvironment.ConnectClient(); | |||
var rpcClient = await testEnvironment.ConnectRpcClient(new MqttRpcClientOptionsBuilder().WithTopicGenerationStrategy(new TestTopicStrategy()) .Build()); | |||
var rpcClient = await testEnvironment.ConnectRpcClient(new MqttRpcClientOptionsBuilder().WithTopicGenerationStrategy(new TestTopicStrategy()).Build()); | |||
await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); | |||
} | |||
} | |||
@@ -105,12 +124,86 @@ namespace MQTTnet.Tests | |||
using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build())) | |||
{ | |||
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", qosLevel); | |||
Assert.AreEqual("pong", Encoding.UTF8.GetString(response)); | |||
} | |||
} | |||
} | |||
async Task Execute_Success_MQTT_V5(MqttQualityOfServiceLevel qosLevel) | |||
{ | |||
using (var testEnvironment = new TestEnvironment(TestContext)) | |||
{ | |||
await testEnvironment.StartServer(); | |||
var responseSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500)); | |||
await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping", qosLevel); | |||
responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e => | |||
{ | |||
await responseSender.PublishAsync(e.ApplicationMessage.ResponseTopic, "pong"); | |||
}); | |||
var requestSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500)); | |||
using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build())) | |||
{ | |||
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", qosLevel); | |||
Assert.AreEqual("pong", Encoding.UTF8.GetString(response)); | |||
} | |||
} | |||
} | |||
[TestMethod] | |||
public async Task Execute_Success_MQTT_V5_Mixed_Clients() | |||
{ | |||
using (var testEnvironment = new TestEnvironment(TestContext)) | |||
{ | |||
await testEnvironment.StartServer(); | |||
var responseSender = await testEnvironment.ConnectClient(); | |||
await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping", MqttQualityOfServiceLevel.AtMostOnce); | |||
responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e => | |||
{ | |||
Assert.IsNull(e.ApplicationMessage.ResponseTopic); | |||
await responseSender.PublishAsync(e.ApplicationMessage.Topic + "/response", "pong"); | |||
}); | |||
var requestSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500)); | |||
using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build())) | |||
{ | |||
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); | |||
Assert.AreEqual("pong", Encoding.UTF8.GetString(response)); | |||
} | |||
} | |||
} | |||
[TestMethod] | |||
[ExpectedException(typeof(MqttCommunicationTimedOutException))] | |||
public async Task Execute_Timeout_MQTT_V5_Mixed_Clients() | |||
{ | |||
using (var testEnvironment = new TestEnvironment(TestContext)) | |||
{ | |||
await testEnvironment.StartServer(); | |||
var responseSender = await testEnvironment.ConnectClient(); | |||
await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping", MqttQualityOfServiceLevel.AtMostOnce); | |||
responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e => | |||
{ | |||
Assert.IsNull(e.ApplicationMessage.ResponseTopic); | |||
}); | |||
var requestSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder().WithProtocolVersion(MqttProtocolVersion.V500)); | |||
using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build())) | |||
{ | |||
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce); | |||
} | |||
} | |||
} | |||
class TestTopicStrategy : IMqttRpcClientTopicGenerationStrategy | |||
{ | |||
public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context) | |||