You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

MqttServerService.cs 10 KiB

5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
5 jaren geleden
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.WebSockets;
  5. using System.Security.Authentication;
  6. using System.Text;
  7. using System.Threading.Tasks;
  8. using IronPython.Runtime;
  9. using Microsoft.AspNetCore.Http;
  10. using Microsoft.Extensions.Logging;
  11. using MQTTnet.Adapter;
  12. using MQTTnet.AspNetCore;
  13. using MQTTnet.Client.Publishing;
  14. using MQTTnet.Implementations;
  15. using MQTTnet.Protocol;
  16. using MQTTnet.Server.Configuration;
  17. using MQTTnet.Server.Scripting;
  18. using MQTTnet.Server.Status;
  19. namespace MQTTnet.Server.Mqtt
  20. {
  21. public class MqttServerService
  22. {
  23. private readonly ILogger<MqttServerService> _logger;
  24. private readonly SettingsModel _settings;
  25. private readonly MqttApplicationMessageInterceptor _mqttApplicationMessageInterceptor;
  26. private readonly MqttServerStorage _mqttServerStorage;
  27. private readonly MqttClientConnectedHandler _mqttClientConnectedHandler;
  28. private readonly MqttClientDisconnectedHandler _mqttClientDisconnectedHandler;
  29. private readonly MqttClientSubscribedTopicHandler _mqttClientSubscribedTopicHandler;
  30. private readonly MqttClientUnsubscribedTopicHandler _mqttClientUnsubscribedTopicHandler;
  31. private readonly MqttConnectionValidator _mqttConnectionValidator;
  32. private readonly IMqttServer _mqttServer;
  33. private readonly MqttSubscriptionInterceptor _mqttSubscriptionInterceptor;
  34. private readonly PythonScriptHostService _pythonScriptHostService;
  35. private readonly MqttWebSocketServerAdapter _webSocketServerAdapter;
  36. public MqttServerService(
  37. SettingsModel settings,
  38. CustomMqttFactory mqttFactory,
  39. MqttClientConnectedHandler mqttClientConnectedHandler,
  40. MqttClientDisconnectedHandler mqttClientDisconnectedHandler,
  41. MqttClientSubscribedTopicHandler mqttClientSubscribedTopicHandler,
  42. MqttClientUnsubscribedTopicHandler mqttClientUnsubscribedTopicHandler,
  43. MqttConnectionValidator mqttConnectionValidator,
  44. MqttSubscriptionInterceptor mqttSubscriptionInterceptor,
  45. MqttApplicationMessageInterceptor mqttApplicationMessageInterceptor,
  46. MqttServerStorage mqttServerStorage,
  47. PythonScriptHostService pythonScriptHostService,
  48. ILogger<MqttServerService> logger)
  49. {
  50. _settings = settings ?? throw new ArgumentNullException(nameof(settings));
  51. _mqttClientConnectedHandler = mqttClientConnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientConnectedHandler));
  52. _mqttClientDisconnectedHandler = mqttClientDisconnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientDisconnectedHandler));
  53. _mqttClientSubscribedTopicHandler = mqttClientSubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientSubscribedTopicHandler));
  54. _mqttClientUnsubscribedTopicHandler = mqttClientUnsubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientUnsubscribedTopicHandler));
  55. _mqttConnectionValidator = mqttConnectionValidator ?? throw new ArgumentNullException(nameof(mqttConnectionValidator));
  56. _mqttSubscriptionInterceptor = mqttSubscriptionInterceptor ?? throw new ArgumentNullException(nameof(mqttSubscriptionInterceptor));
  57. _mqttApplicationMessageInterceptor = mqttApplicationMessageInterceptor ?? throw new ArgumentNullException(nameof(mqttApplicationMessageInterceptor));
  58. _mqttServerStorage = mqttServerStorage ?? throw new ArgumentNullException(nameof(mqttServerStorage));
  59. _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService));
  60. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  61. _webSocketServerAdapter = new MqttWebSocketServerAdapter(mqttFactory.Logger.CreateChildLogger());
  62. var adapters = new List<IMqttServerAdapter>
  63. {
  64. new MqttTcpServerAdapter(mqttFactory.Logger.CreateChildLogger()),
  65. _webSocketServerAdapter
  66. };
  67. _mqttServer = mqttFactory.CreateMqttServer(adapters);
  68. }
  69. public void Configure()
  70. {
  71. _pythonScriptHostService.RegisterProxyObject("publish", new Action<PythonDictionary>(Publish));
  72. _mqttServerStorage.Configure();
  73. _mqttServer.ClientConnectedHandler = _mqttClientConnectedHandler;
  74. _mqttServer.ClientDisconnectedHandler = _mqttClientDisconnectedHandler;
  75. _mqttServer.ClientSubscribedTopicHandler = _mqttClientSubscribedTopicHandler;
  76. _mqttServer.ClientUnsubscribedTopicHandler = _mqttClientUnsubscribedTopicHandler;
  77. _mqttServer.StartAsync(CreateMqttServerOptions()).GetAwaiter().GetResult();
  78. _logger.LogInformation("MQTT server started.");
  79. }
  80. public Task RunWebSocketConnectionAsync(WebSocket webSocket, HttpContext httpContext)
  81. {
  82. return _webSocketServerAdapter.RunWebSocketConnectionAsync(webSocket, httpContext);
  83. }
  84. public Task<IList<IMqttClientStatus>> GetClientStatusAsync()
  85. {
  86. return _mqttServer.GetClientStatusAsync();
  87. }
  88. public Task<IList<IMqttSessionStatus>> GetSessionStatusAsync()
  89. {
  90. return _mqttServer.GetSessionStatusAsync();
  91. }
  92. public Task ClearRetainedApplicationMessagesAsync()
  93. {
  94. return _mqttServer.ClearRetainedApplicationMessagesAsync();
  95. }
  96. public Task<IList<MqttApplicationMessage>> GetRetainedApplicationMessagesAsync()
  97. {
  98. return _mqttServer.GetRetainedApplicationMessagesAsync();
  99. }
  100. public Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage)
  101. {
  102. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  103. return _mqttServer.PublishAsync(applicationMessage);
  104. }
  105. private void Publish(PythonDictionary parameters)
  106. {
  107. try
  108. {
  109. var applicationMessageBuilder = new MqttApplicationMessageBuilder()
  110. .WithTopic((string)parameters.get("topic", null))
  111. .WithRetainFlag((bool)parameters.get("retain", false))
  112. .WithQualityOfServiceLevel((MqttQualityOfServiceLevel)(int)parameters.get("qos", 0));
  113. var payload = parameters.get("payload", null);
  114. byte[] binaryPayload;
  115. if (payload == null)
  116. {
  117. binaryPayload = new byte[0];
  118. }
  119. else if (payload is string stringPayload)
  120. {
  121. binaryPayload = Encoding.UTF8.GetBytes(stringPayload);
  122. }
  123. else if (payload is ByteArray byteArray)
  124. {
  125. binaryPayload = byteArray.ToArray();
  126. }
  127. else if (payload is IEnumerable<int> intArray)
  128. {
  129. binaryPayload = intArray.Select(Convert.ToByte).ToArray();
  130. }
  131. else
  132. {
  133. throw new NotSupportedException("Payload type not supported.");
  134. }
  135. applicationMessageBuilder = applicationMessageBuilder
  136. .WithPayload(binaryPayload);
  137. var applicationMessage = applicationMessageBuilder.Build();
  138. _mqttServer.PublishAsync(applicationMessage).GetAwaiter().GetResult();
  139. }
  140. catch (Exception exception)
  141. {
  142. _logger.LogError(exception, "Error while publishing application message from server.");
  143. }
  144. }
  145. private IMqttServerOptions CreateMqttServerOptions()
  146. {
  147. var options = new MqttServerOptionsBuilder()
  148. .WithMaxPendingMessagesPerClient(_settings.MaxPendingMessagesPerClient)
  149. .WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(_settings.CommunicationTimeout))
  150. .WithConnectionValidator(_mqttConnectionValidator)
  151. .WithApplicationMessageInterceptor(_mqttApplicationMessageInterceptor)
  152. .WithSubscriptionInterceptor(_mqttSubscriptionInterceptor)
  153. .WithStorage(_mqttServerStorage);
  154. // Configure unencrypted connections
  155. if (_settings.TcpEndPoint.Enabled)
  156. {
  157. options.WithDefaultEndpoint();
  158. if (_settings.TcpEndPoint.TryReadIPv4(out var address4))
  159. {
  160. options.WithDefaultEndpointBoundIPAddress(address4);
  161. }
  162. if (_settings.TcpEndPoint.TryReadIPv6(out var address6))
  163. {
  164. options.WithDefaultEndpointBoundIPV6Address(address6);
  165. }
  166. if (_settings.TcpEndPoint.Port > 0)
  167. {
  168. options.WithDefaultEndpointPort(_settings.TcpEndPoint.Port);
  169. }
  170. }
  171. else
  172. {
  173. options.WithoutDefaultEndpoint();
  174. }
  175. // Configure encrypted connections
  176. if (_settings.EncryptedTcpEndPoint.Enabled)
  177. {
  178. options
  179. .WithEncryptedEndpoint()
  180. .WithEncryptionSslProtocol(SslProtocols.Tls12)
  181. .WithEncryptionCertificate(_settings.EncryptedTcpEndPoint.ReadCertificate());
  182. if (_settings.EncryptedTcpEndPoint.TryReadIPv4(out var address4))
  183. {
  184. options.WithEncryptedEndpointBoundIPAddress(address4);
  185. }
  186. if (_settings.EncryptedTcpEndPoint.TryReadIPv6(out var address6))
  187. {
  188. options.WithEncryptedEndpointBoundIPV6Address(address6);
  189. }
  190. if (_settings.EncryptedTcpEndPoint.Port > 0)
  191. {
  192. options.WithEncryptedEndpointPort(_settings.EncryptedTcpEndPoint.Port);
  193. }
  194. }
  195. else
  196. {
  197. options.WithoutEncryptedEndpoint();
  198. }
  199. if (_settings.ConnectionBacklog > 0)
  200. {
  201. options.WithConnectionBacklog(_settings.ConnectionBacklog);
  202. }
  203. if (_settings.EnablePersistentSessions)
  204. {
  205. options.WithPersistentSessions();
  206. }
  207. return options.Build();
  208. }
  209. }
  210. }