You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

267 lines
11 KiB

  1. using IronPython.Runtime;
  2. using Microsoft.AspNetCore.Http;
  3. using Microsoft.Extensions.Logging;
  4. using MQTTnet.Adapter;
  5. using MQTTnet.AspNetCore;
  6. using MQTTnet.Client.Publishing;
  7. using MQTTnet.Implementations;
  8. using MQTTnet.Protocol;
  9. using MQTTnet.Server.Configuration;
  10. using MQTTnet.Server.Scripting;
  11. using MQTTnet.Server.Status;
  12. using System;
  13. using System.Collections.Generic;
  14. using System.Linq;
  15. using System.Net.WebSockets;
  16. using System.Security.Authentication;
  17. using System.Text;
  18. using System.Threading.Tasks;
  19. namespace MQTTnet.Server.Mqtt
  20. {
  21. public class MqttServerService
  22. {
  23. readonly ILogger<MqttServerService> _logger;
  24. readonly MqttSettingsModel _settings;
  25. readonly MqttApplicationMessageInterceptor _mqttApplicationMessageInterceptor;
  26. readonly MqttServerStorage _mqttServerStorage;
  27. readonly MqttClientConnectedHandler _mqttClientConnectedHandler;
  28. readonly MqttClientDisconnectedHandler _mqttClientDisconnectedHandler;
  29. readonly MqttClientSubscribedTopicHandler _mqttClientSubscribedTopicHandler;
  30. readonly MqttClientUnsubscribedTopicHandler _mqttClientUnsubscribedTopicHandler;
  31. readonly MqttServerConnectionValidator _mqttConnectionValidator;
  32. readonly IMqttServer _mqttServer;
  33. readonly MqttSubscriptionInterceptor _mqttSubscriptionInterceptor;
  34. readonly MqttUnsubscriptionInterceptor _mqttUnsubscriptionInterceptor;
  35. readonly PythonScriptHostService _pythonScriptHostService;
  36. readonly MqttWebSocketServerAdapter _webSocketServerAdapter;
  37. public MqttServerService(
  38. MqttSettingsModel mqttSettings,
  39. CustomMqttFactory mqttFactory,
  40. MqttClientConnectedHandler mqttClientConnectedHandler,
  41. MqttClientDisconnectedHandler mqttClientDisconnectedHandler,
  42. MqttClientSubscribedTopicHandler mqttClientSubscribedTopicHandler,
  43. MqttClientUnsubscribedTopicHandler mqttClientUnsubscribedTopicHandler,
  44. MqttServerConnectionValidator mqttConnectionValidator,
  45. MqttSubscriptionInterceptor mqttSubscriptionInterceptor,
  46. MqttUnsubscriptionInterceptor mqttUnsubscriptionInterceptor,
  47. MqttApplicationMessageInterceptor mqttApplicationMessageInterceptor,
  48. MqttServerStorage mqttServerStorage,
  49. PythonScriptHostService pythonScriptHostService,
  50. ILogger<MqttServerService> logger)
  51. {
  52. _settings = mqttSettings ?? throw new ArgumentNullException(nameof(mqttSettings));
  53. _mqttClientConnectedHandler = mqttClientConnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientConnectedHandler));
  54. _mqttClientDisconnectedHandler = mqttClientDisconnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientDisconnectedHandler));
  55. _mqttClientSubscribedTopicHandler = mqttClientSubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientSubscribedTopicHandler));
  56. _mqttClientUnsubscribedTopicHandler = mqttClientUnsubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientUnsubscribedTopicHandler));
  57. _mqttConnectionValidator = mqttConnectionValidator ?? throw new ArgumentNullException(nameof(mqttConnectionValidator));
  58. _mqttSubscriptionInterceptor = mqttSubscriptionInterceptor ?? throw new ArgumentNullException(nameof(mqttSubscriptionInterceptor));
  59. _mqttUnsubscriptionInterceptor = mqttUnsubscriptionInterceptor ?? throw new ArgumentNullException(nameof(mqttUnsubscriptionInterceptor));
  60. _mqttApplicationMessageInterceptor = mqttApplicationMessageInterceptor ?? throw new ArgumentNullException(nameof(mqttApplicationMessageInterceptor));
  61. _mqttServerStorage = mqttServerStorage ?? throw new ArgumentNullException(nameof(mqttServerStorage));
  62. _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService));
  63. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  64. _webSocketServerAdapter = new MqttWebSocketServerAdapter(mqttFactory.Logger);
  65. var adapters = new List<IMqttServerAdapter>
  66. {
  67. new MqttTcpServerAdapter(mqttFactory.Logger)
  68. {
  69. TreatSocketOpeningErrorAsWarning = true // Opening other ports than for HTTP is not allows in Azure App Services.
  70. },
  71. _webSocketServerAdapter
  72. };
  73. _mqttServer = mqttFactory.CreateMqttServer(adapters);
  74. }
  75. public void Configure()
  76. {
  77. _pythonScriptHostService.RegisterProxyObject("publish", new Action<PythonDictionary>(Publish));
  78. _mqttServerStorage.Configure();
  79. _mqttServer.ClientConnectedHandler = _mqttClientConnectedHandler;
  80. _mqttServer.ClientDisconnectedHandler = _mqttClientDisconnectedHandler;
  81. _mqttServer.ClientSubscribedTopicHandler = _mqttClientSubscribedTopicHandler;
  82. _mqttServer.ClientUnsubscribedTopicHandler = _mqttClientUnsubscribedTopicHandler;
  83. _mqttServer.StartAsync(CreateMqttServerOptions()).GetAwaiter().GetResult();
  84. _logger.LogInformation("MQTT server started.");
  85. }
  86. public Task RunWebSocketConnectionAsync(WebSocket webSocket, HttpContext httpContext)
  87. {
  88. return _webSocketServerAdapter.RunWebSocketConnectionAsync(webSocket, httpContext);
  89. }
  90. public Task<IList<IMqttClientStatus>> GetClientStatusAsync()
  91. {
  92. return _mqttServer.GetClientStatusAsync();
  93. }
  94. public Task<IList<IMqttSessionStatus>> GetSessionStatusAsync()
  95. {
  96. return _mqttServer.GetSessionStatusAsync();
  97. }
  98. public Task ClearRetainedApplicationMessagesAsync()
  99. {
  100. return _mqttServer.ClearRetainedApplicationMessagesAsync();
  101. }
  102. public Task<IList<MqttApplicationMessage>> GetRetainedApplicationMessagesAsync()
  103. {
  104. return _mqttServer.GetRetainedApplicationMessagesAsync();
  105. }
  106. public Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage)
  107. {
  108. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  109. return _mqttServer.PublishAsync(applicationMessage);
  110. }
  111. void Publish(PythonDictionary parameters)
  112. {
  113. try
  114. {
  115. var applicationMessageBuilder = new MqttApplicationMessageBuilder()
  116. .WithTopic((string)parameters.get("topic", null))
  117. .WithRetainFlag((bool)parameters.get("retain", false))
  118. .WithQualityOfServiceLevel((MqttQualityOfServiceLevel)(int)parameters.get("qos", 0));
  119. var payload = parameters.get("payload", null);
  120. byte[] binaryPayload;
  121. if (payload == null)
  122. {
  123. binaryPayload = new byte[0];
  124. }
  125. else if (payload is string stringPayload)
  126. {
  127. binaryPayload = Encoding.UTF8.GetBytes(stringPayload);
  128. }
  129. else if (payload is ByteArray byteArray)
  130. {
  131. binaryPayload = byteArray.ToArray();
  132. }
  133. else if (payload is IEnumerable<int> intArray)
  134. {
  135. binaryPayload = intArray.Select(Convert.ToByte).ToArray();
  136. }
  137. else
  138. {
  139. throw new NotSupportedException("Payload type not supported.");
  140. }
  141. applicationMessageBuilder = applicationMessageBuilder
  142. .WithPayload(binaryPayload);
  143. var applicationMessage = applicationMessageBuilder.Build();
  144. _mqttServer.PublishAsync(applicationMessage).GetAwaiter().GetResult();
  145. }
  146. catch (Exception exception)
  147. {
  148. _logger.LogError(exception, "Error while publishing application message from server.");
  149. }
  150. }
  151. IMqttServerOptions CreateMqttServerOptions()
  152. {
  153. var options = new MqttServerOptionsBuilder()
  154. .WithMaxPendingMessagesPerClient(_settings.MaxPendingMessagesPerClient)
  155. .WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(_settings.CommunicationTimeout))
  156. .WithConnectionValidator(_mqttConnectionValidator)
  157. .WithApplicationMessageInterceptor(_mqttApplicationMessageInterceptor)
  158. .WithSubscriptionInterceptor(_mqttSubscriptionInterceptor)
  159. .WithUnsubscriptionInterceptor(_mqttUnsubscriptionInterceptor)
  160. .WithStorage(_mqttServerStorage);
  161. // Configure unencrypted connections
  162. if (_settings.TcpEndPoint.Enabled)
  163. {
  164. options.WithDefaultEndpoint();
  165. if (_settings.TcpEndPoint.TryReadIPv4(out var address4))
  166. {
  167. options.WithDefaultEndpointBoundIPAddress(address4);
  168. }
  169. if (_settings.TcpEndPoint.TryReadIPv6(out var address6))
  170. {
  171. options.WithDefaultEndpointBoundIPV6Address(address6);
  172. }
  173. if (_settings.TcpEndPoint.Port > 0)
  174. {
  175. options.WithDefaultEndpointPort(_settings.TcpEndPoint.Port);
  176. }
  177. }
  178. else
  179. {
  180. options.WithoutDefaultEndpoint();
  181. }
  182. // Configure encrypted connections
  183. if (_settings.EncryptedTcpEndPoint.Enabled)
  184. {
  185. options
  186. .WithEncryptedEndpoint()
  187. .WithEncryptionSslProtocol(SslProtocols.Tls12);
  188. if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Path))
  189. {
  190. IMqttServerCertificateCredentials certificateCredentials = null;
  191. if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Password))
  192. {
  193. certificateCredentials = new MqttServerCertificateCredentials
  194. {
  195. Password = _settings.EncryptedTcpEndPoint.Certificate.Password
  196. };
  197. }
  198. options.WithEncryptionCertificate(_settings.EncryptedTcpEndPoint.Certificate.ReadCertificate(), certificateCredentials);
  199. }
  200. if (_settings.EncryptedTcpEndPoint.TryReadIPv4(out var address4))
  201. {
  202. options.WithEncryptedEndpointBoundIPAddress(address4);
  203. }
  204. if (_settings.EncryptedTcpEndPoint.TryReadIPv6(out var address6))
  205. {
  206. options.WithEncryptedEndpointBoundIPV6Address(address6);
  207. }
  208. if (_settings.EncryptedTcpEndPoint.Port > 0)
  209. {
  210. options.WithEncryptedEndpointPort(_settings.EncryptedTcpEndPoint.Port);
  211. }
  212. }
  213. else
  214. {
  215. options.WithoutEncryptedEndpoint();
  216. }
  217. if (_settings.ConnectionBacklog > 0)
  218. {
  219. options.WithConnectionBacklog(_settings.ConnectionBacklog);
  220. }
  221. if (_settings.EnablePersistentSessions)
  222. {
  223. options.WithPersistentSessions();
  224. }
  225. return options.Build();
  226. }
  227. }
  228. }