You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ServerTest.cs 4.5 KiB

7 years ago
7 years ago
7 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. using System;
  2. using System.Text;
  3. using System.Threading.Tasks;
  4. using MQTTnet.Protocol;
  5. using MQTTnet.Server;
  6. using Newtonsoft.Json.Linq;
  7. namespace MQTTnet.TestApp.NetCore
  8. {
  9. public static class ServerTest
  10. {
  11. public static async Task RunAsync()
  12. {
  13. try
  14. {
  15. MqttNetConsoleLogger.ForwardToConsole();
  16. var options = new MqttServerOptions
  17. {
  18. ConnectionValidator = p =>
  19. {
  20. if (p.ClientId == "SpecialClient")
  21. {
  22. if (p.Username != "USER" || p.Password != "PASS")
  23. {
  24. p.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
  25. }
  26. }
  27. },
  28. Storage = new RetainedMessageHandler(),
  29. ApplicationMessageInterceptor = context =>
  30. {
  31. if (MqttTopicFilterComparer.IsMatch(context.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#"))
  32. {
  33. // Replace the payload with the timestamp. But also extending a JSON
  34. // based payload with the timestamp is a suitable use case.
  35. context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
  36. }
  37. },
  38. SubscriptionInterceptor = context =>
  39. {
  40. if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
  41. {
  42. context.AcceptSubscription = false;
  43. }
  44. if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
  45. {
  46. context.AcceptSubscription = false;
  47. context.CloseConnection = true;
  48. }
  49. }
  50. };
  51. // Extend the timestamp for all messages from clients.
  52. // Protect several topics from being subscribed from every client.
  53. //var certificate = new X509Certificate(@"C:\certs\test\test.cer", "");
  54. //options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert);
  55. //options.ConnectionBacklog = 5;
  56. //options.DefaultEndpointOptions.IsEnabled = true;
  57. //options.TlsEndpointOptions.IsEnabled = false;
  58. var mqttServer = new MqttFactory().CreateMqttServer();
  59. mqttServer.ApplicationMessageReceived += (s, e) =>
  60. {
  61. MqttNetConsoleLogger.PrintToConsole(
  62. $"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0])}'",
  63. ConsoleColor.Magenta);
  64. };
  65. options.ApplicationMessageInterceptor = c =>
  66. {
  67. if (c.ApplicationMessage.Payload == null || c.ApplicationMessage.Payload.Length == 0)
  68. {
  69. return;
  70. }
  71. try
  72. {
  73. var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload));
  74. var timestampProperty = content.Property("timestamp");
  75. if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null)
  76. {
  77. timestampProperty.Value = DateTime.Now.ToString("O");
  78. c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString());
  79. }
  80. }
  81. catch (Exception e)
  82. {
  83. }
  84. };
  85. mqttServer.ClientDisconnected += (s, e) =>
  86. {
  87. Console.Write("Client disconnected event fired.");
  88. };
  89. await mqttServer.StartAsync(options);
  90. Console.WriteLine("Press any key to exit.");
  91. Console.ReadLine();
  92. await mqttServer.StopAsync();
  93. }
  94. catch (Exception e)
  95. {
  96. Console.WriteLine(e);
  97. }
  98. Console.ReadLine();
  99. }
  100. }
  101. }