You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

175 lines
6.6 KiB

  1. using System.Linq;
  2. using System.Text;
  3. using System.Threading.Tasks;
  4. using Microsoft.VisualStudio.TestTools.UnitTesting;
  5. using MQTTnet.Client;
  6. using MQTTnet.Client.Options;
  7. using MQTTnet.Client.Subscribing;
  8. using MQTTnet.Server;
  9. using MQTTnet.Tests.Mockups;
  10. namespace MQTTnet.Tests.Server
  11. {
  12. [TestClass]
  13. public class Session_Tests
  14. {
  15. public TestContext TestContext { get; set; }
  16. [TestMethod]
  17. public async Task Set_Session_Item()
  18. {
  19. using (var testEnvironment = new TestEnvironment(TestContext))
  20. {
  21. var serverOptions = new MqttServerOptionsBuilder()
  22. .WithConnectionValidator(delegate (MqttConnectionValidatorContext context)
  23. {
  24. // Don't validate anything. Just set some session items.
  25. context.SessionItems["can_subscribe_x"] = true;
  26. context.SessionItems["default_payload"] = "Hello World";
  27. })
  28. .WithSubscriptionInterceptor(delegate (MqttSubscriptionInterceptorContext context)
  29. {
  30. if (context.TopicFilter.Topic == "x")
  31. {
  32. context.AcceptSubscription = context.SessionItems["can_subscribe_x"] as bool? == true;
  33. }
  34. })
  35. .WithApplicationMessageInterceptor(delegate (MqttApplicationMessageInterceptorContext context)
  36. {
  37. context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(context.SessionItems["default_payload"] as string);
  38. });
  39. await testEnvironment.StartServer(serverOptions);
  40. string receivedPayload = null;
  41. var client = await testEnvironment.ConnectClient();
  42. client.UseApplicationMessageReceivedHandler(delegate (MqttApplicationMessageReceivedEventArgs args)
  43. {
  44. receivedPayload = args.ApplicationMessage.ConvertPayloadToString();
  45. });
  46. var subscribeResult = await client.SubscribeAsync("x");
  47. Assert.AreEqual(MqttClientSubscribeResultCode.GrantedQoS0, subscribeResult.Items[0].ResultCode);
  48. var client2 = await testEnvironment.ConnectClient();
  49. await client2.PublishAsync("x");
  50. await Task.Delay(1000);
  51. Assert.AreEqual("Hello World", receivedPayload);
  52. }
  53. }
  54. [TestMethod]
  55. public async Task Get_Session_Items_In_Status()
  56. {
  57. using (var testEnvironment = new TestEnvironment(TestContext))
  58. {
  59. var serverOptions = new MqttServerOptionsBuilder()
  60. .WithConnectionValidator(delegate (MqttConnectionValidatorContext context)
  61. {
  62. // Don't validate anything. Just set some session items.
  63. context.SessionItems["can_subscribe_x"] = true;
  64. context.SessionItems["default_payload"] = "Hello World";
  65. });
  66. await testEnvironment.StartServer(serverOptions);
  67. var client = await testEnvironment.ConnectClient();
  68. var sessionStatus = await testEnvironment.Server.GetSessionStatusAsync();
  69. var session = sessionStatus.First();
  70. Assert.AreEqual(true, session.Items["can_subscribe_x"]);
  71. }
  72. }
  73. [TestMethod]
  74. public async Task Manage_Session_MaxParallel()
  75. {
  76. using (var testEnvironment = new TestEnvironment(TestContext))
  77. {
  78. testEnvironment.IgnoreClientLogErrors = true;
  79. var serverOptions = new MqttServerOptionsBuilder();
  80. await testEnvironment.StartServer(serverOptions);
  81. var options = new MqttClientOptionsBuilder().WithClientId("1");
  82. var clients = await Task.WhenAll(Enumerable.Range(0, 10)
  83. .Select(i => TryConnect(testEnvironment, options)));
  84. var connectedClients = clients.Where(c => c?.IsConnected ?? false).ToList();
  85. Assert.AreEqual(1, connectedClients.Count);
  86. }
  87. }
  88. [TestMethod]
  89. public async Task Clean_Session_Persistence()
  90. {
  91. using (var testEnvironment = new TestEnvironment(TestContext))
  92. {
  93. // Create server with persistent sessions enabled
  94. await testEnvironment.StartServer(o => o.WithPersistentSessions());
  95. const string ClientId = "Client1";
  96. // Create client with clean session and long session expiry interval
  97. var client1 = await testEnvironment.ConnectClient(o => o
  98. .WithProtocolVersion(Formatter.MqttProtocolVersion.V311)
  99. .WithTcpServer("127.0.0.1", testEnvironment.ServerPort)
  100. .WithSessionExpiryInterval(9999) // not relevant for v311 but testing impact
  101. .WithCleanSession(true) // start and end with clean session
  102. .WithClientId(ClientId)
  103. .Build()
  104. );
  105. // Disconnect; empty session should be removed from server
  106. await client1.DisconnectAsync();
  107. // Simulate some time delay between connections
  108. await Task.Delay(1000);
  109. // Reconnect the same client ID without clean session
  110. var client2 = testEnvironment.CreateClient();
  111. var options = testEnvironment.Factory.CreateClientOptionsBuilder()
  112. .WithProtocolVersion(Formatter.MqttProtocolVersion.V311)
  113. .WithTcpServer("127.0.0.1", testEnvironment.ServerPort)
  114. .WithSessionExpiryInterval(9999) // not relevant for v311 but testing impact
  115. .WithCleanSession(false) // see if there is a session
  116. .WithClientId(ClientId)
  117. .Build();
  118. var result = await client2.ConnectAsync(options).ConfigureAwait(false);
  119. await client2.DisconnectAsync();
  120. // Session should NOT be present for MQTT v311 and initial CleanSession == true
  121. Assert.IsTrue(!result.IsSessionPresent, "Session present");
  122. }
  123. }
  124. async Task<IMqttClient> TryConnect(TestEnvironment testEnvironment, MqttClientOptionsBuilder options)
  125. {
  126. try
  127. {
  128. return await testEnvironment.ConnectClient(options);
  129. }
  130. catch (System.Exception)
  131. {
  132. return null;
  133. }
  134. }
  135. }
  136. }