You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

267 lines
12 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.WebSockets;
  5. using System.Security.Authentication;
  6. using System.Text;
  7. using System.Threading.Tasks;
  8. using IronPython.Runtime;
  9. using Microsoft.AspNetCore.Http;
  10. using Microsoft.Extensions.Logging;
  11. using MQTTnet.Adapter;
  12. using MQTTnet.AspNetCore;
  13. using MQTTnet.Client.Publishing;
  14. using MQTTnet.Implementations;
  15. using MQTTnet.Protocol;
  16. using MQTTnet.Server.Configuration;
  17. using MQTTnet.Server.Scripting;
  18. using MQTTnet.Server.Status;
  19. namespace MQTTnet.Server.Mqtt
  20. {
  21. public class MqttServerService
  22. {
  23. private readonly ILogger<MqttServerService> _logger;
  24. private readonly MqttSettingsModel _settings;
  25. private readonly MqttApplicationMessageInterceptor _mqttApplicationMessageInterceptor;
  26. private readonly MqttServerStorage _mqttServerStorage;
  27. private readonly MqttClientConnectedHandler _mqttClientConnectedHandler;
  28. private readonly MqttClientDisconnectedHandler _mqttClientDisconnectedHandler;
  29. private readonly MqttClientSubscribedTopicHandler _mqttClientSubscribedTopicHandler;
  30. private readonly MqttClientUnsubscribedTopicHandler _mqttClientUnsubscribedTopicHandler;
  31. private readonly MqttServerConnectionValidator _mqttConnectionValidator;
  32. private readonly IMqttServer _mqttServer;
  33. private readonly MqttSubscriptionInterceptor _mqttSubscriptionInterceptor;
  34. private readonly MqttUnsubscriptionInterceptor _mqttUnsubscriptionInterceptor;
  35. private readonly PythonScriptHostService _pythonScriptHostService;
  36. private readonly MqttWebSocketServerAdapter _webSocketServerAdapter;
  37. public MqttServerService(
  38. MqttSettingsModel mqttSettings,
  39. CustomMqttFactory mqttFactory,
  40. MqttClientConnectedHandler mqttClientConnectedHandler,
  41. MqttClientDisconnectedHandler mqttClientDisconnectedHandler,
  42. MqttClientSubscribedTopicHandler mqttClientSubscribedTopicHandler,
  43. MqttClientUnsubscribedTopicHandler mqttClientUnsubscribedTopicHandler,
  44. MqttServerConnectionValidator mqttConnectionValidator,
  45. MqttSubscriptionInterceptor mqttSubscriptionInterceptor,
  46. MqttUnsubscriptionInterceptor mqttUnsubscriptionInterceptor,
  47. MqttApplicationMessageInterceptor mqttApplicationMessageInterceptor,
  48. MqttServerStorage mqttServerStorage,
  49. PythonScriptHostService pythonScriptHostService,
  50. ILogger<MqttServerService> logger)
  51. {
  52. _settings = mqttSettings ?? throw new ArgumentNullException(nameof(mqttSettings));
  53. _mqttClientConnectedHandler = mqttClientConnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientConnectedHandler));
  54. _mqttClientDisconnectedHandler = mqttClientDisconnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientDisconnectedHandler));
  55. _mqttClientSubscribedTopicHandler = mqttClientSubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientSubscribedTopicHandler));
  56. _mqttClientUnsubscribedTopicHandler = mqttClientUnsubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientUnsubscribedTopicHandler));
  57. _mqttConnectionValidator = mqttConnectionValidator ?? throw new ArgumentNullException(nameof(mqttConnectionValidator));
  58. _mqttSubscriptionInterceptor = mqttSubscriptionInterceptor ?? throw new ArgumentNullException(nameof(mqttSubscriptionInterceptor));
  59. _mqttUnsubscriptionInterceptor = mqttUnsubscriptionInterceptor ?? throw new ArgumentNullException(nameof(mqttUnsubscriptionInterceptor));
  60. _mqttApplicationMessageInterceptor = mqttApplicationMessageInterceptor ?? throw new ArgumentNullException(nameof(mqttApplicationMessageInterceptor));
  61. _mqttServerStorage = mqttServerStorage ?? throw new ArgumentNullException(nameof(mqttServerStorage));
  62. _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService));
  63. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  64. _webSocketServerAdapter = new MqttWebSocketServerAdapter(mqttFactory.Logger.CreateChildLogger());
  65. var adapters = new List<IMqttServerAdapter>
  66. {
  67. new MqttTcpServerAdapter(mqttFactory.Logger.CreateChildLogger())
  68. {
  69. TreatSocketOpeningErrorAsWarning = true // Opening other ports than for HTTP is not allows in Azure App Services.
  70. },
  71. _webSocketServerAdapter
  72. };
  73. _mqttServer = mqttFactory.CreateMqttServer(adapters);
  74. }
  75. public void Configure()
  76. {
  77. _pythonScriptHostService.RegisterProxyObject("publish", new Action<PythonDictionary>(Publish));
  78. _mqttServerStorage.Configure();
  79. _mqttServer.ClientConnectedHandler = _mqttClientConnectedHandler;
  80. _mqttServer.ClientDisconnectedHandler = _mqttClientDisconnectedHandler;
  81. _mqttServer.ClientSubscribedTopicHandler = _mqttClientSubscribedTopicHandler;
  82. _mqttServer.ClientUnsubscribedTopicHandler = _mqttClientUnsubscribedTopicHandler;
  83. _mqttServer.StartAsync(CreateMqttServerOptions()).GetAwaiter().GetResult();
  84. _logger.LogInformation("MQTT server started.");
  85. }
  86. public Task RunWebSocketConnectionAsync(WebSocket webSocket, HttpContext httpContext)
  87. {
  88. return _webSocketServerAdapter.RunWebSocketConnectionAsync(webSocket, httpContext);
  89. }
  90. public Task<IList<IMqttClientStatus>> GetClientStatusAsync()
  91. {
  92. return _mqttServer.GetClientStatusAsync();
  93. }
  94. public Task<IList<IMqttSessionStatus>> GetSessionStatusAsync()
  95. {
  96. return _mqttServer.GetSessionStatusAsync();
  97. }
  98. public Task ClearRetainedApplicationMessagesAsync()
  99. {
  100. return _mqttServer.ClearRetainedApplicationMessagesAsync();
  101. }
  102. public Task<IList<MqttApplicationMessage>> GetRetainedApplicationMessagesAsync()
  103. {
  104. return _mqttServer.GetRetainedApplicationMessagesAsync();
  105. }
  106. public Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage)
  107. {
  108. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  109. return _mqttServer.PublishAsync(applicationMessage);
  110. }
  111. private void Publish(PythonDictionary parameters)
  112. {
  113. try
  114. {
  115. var applicationMessageBuilder = new MqttApplicationMessageBuilder()
  116. .WithTopic((string)parameters.get("topic", null))
  117. .WithRetainFlag((bool)parameters.get("retain", false))
  118. .WithQualityOfServiceLevel((MqttQualityOfServiceLevel)(int)parameters.get("qos", 0));
  119. var payload = parameters.get("payload", null);
  120. byte[] binaryPayload;
  121. if (payload == null)
  122. {
  123. binaryPayload = new byte[0];
  124. }
  125. else if (payload is string stringPayload)
  126. {
  127. binaryPayload = Encoding.UTF8.GetBytes(stringPayload);
  128. }
  129. else if (payload is ByteArray byteArray)
  130. {
  131. binaryPayload = byteArray.ToArray();
  132. }
  133. else if (payload is IEnumerable<int> intArray)
  134. {
  135. binaryPayload = intArray.Select(Convert.ToByte).ToArray();
  136. }
  137. else
  138. {
  139. throw new NotSupportedException("Payload type not supported.");
  140. }
  141. applicationMessageBuilder = applicationMessageBuilder
  142. .WithPayload(binaryPayload);
  143. var applicationMessage = applicationMessageBuilder.Build();
  144. _mqttServer.PublishAsync(applicationMessage).GetAwaiter().GetResult();
  145. }
  146. catch (Exception exception)
  147. {
  148. _logger.LogError(exception, "Error while publishing application message from server.");
  149. }
  150. }
  151. private IMqttServerOptions CreateMqttServerOptions()
  152. {
  153. var options = new MqttServerOptionsBuilder()
  154. .WithMaxPendingMessagesPerClient(_settings.MaxPendingMessagesPerClient)
  155. .WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(_settings.CommunicationTimeout))
  156. .WithConnectionValidator(_mqttConnectionValidator)
  157. .WithApplicationMessageInterceptor(_mqttApplicationMessageInterceptor)
  158. .WithSubscriptionInterceptor(_mqttSubscriptionInterceptor)
  159. .WithUnsubscriptionInterceptor(_mqttUnsubscriptionInterceptor)
  160. .WithStorage(_mqttServerStorage);
  161. // Configure unencrypted connections
  162. if (_settings.TcpEndPoint.Enabled)
  163. {
  164. options.WithDefaultEndpoint();
  165. if (_settings.TcpEndPoint.TryReadIPv4(out var address4))
  166. {
  167. options.WithDefaultEndpointBoundIPAddress(address4);
  168. }
  169. if (_settings.TcpEndPoint.TryReadIPv6(out var address6))
  170. {
  171. options.WithDefaultEndpointBoundIPV6Address(address6);
  172. }
  173. if (_settings.TcpEndPoint.Port > 0)
  174. {
  175. options.WithDefaultEndpointPort(_settings.TcpEndPoint.Port);
  176. }
  177. }
  178. else
  179. {
  180. options.WithoutDefaultEndpoint();
  181. }
  182. // Configure encrypted connections
  183. if (_settings.EncryptedTcpEndPoint.Enabled)
  184. {
  185. options
  186. .WithEncryptedEndpoint()
  187. .WithEncryptionSslProtocol(SslProtocols.Tls12);
  188. if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Path))
  189. {
  190. IMqttServerCertificateCredentials certificateCredentials = null;
  191. if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Password))
  192. {
  193. certificateCredentials = new MqttServerCertificateCredentials
  194. {
  195. Password = _settings.EncryptedTcpEndPoint.Certificate.Password
  196. };
  197. }
  198. options.WithEncryptionCertificate(_settings.EncryptedTcpEndPoint.Certificate.ReadCertificate(), certificateCredentials);
  199. }
  200. if (_settings.EncryptedTcpEndPoint.TryReadIPv4(out var address4))
  201. {
  202. options.WithEncryptedEndpointBoundIPAddress(address4);
  203. }
  204. if (_settings.EncryptedTcpEndPoint.TryReadIPv6(out var address6))
  205. {
  206. options.WithEncryptedEndpointBoundIPV6Address(address6);
  207. }
  208. if (_settings.EncryptedTcpEndPoint.Port > 0)
  209. {
  210. options.WithEncryptedEndpointPort(_settings.EncryptedTcpEndPoint.Port);
  211. }
  212. }
  213. else
  214. {
  215. options.WithoutEncryptedEndpoint();
  216. }
  217. if (_settings.ConnectionBacklog > 0)
  218. {
  219. options.WithConnectionBacklog(_settings.ConnectionBacklog);
  220. }
  221. if (_settings.EnablePersistentSessions)
  222. {
  223. options.WithPersistentSessions();
  224. }
  225. return options.Build();
  226. }
  227. }
  228. }