No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.

MqttServerService.cs 11 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.WebSockets;
  5. using System.Security.Authentication;
  6. using System.Text;
  7. using System.Threading.Tasks;
  8. using IronPython.Runtime;
  9. using Microsoft.AspNetCore.Http;
  10. using Microsoft.Extensions.Logging;
  11. using MQTTnet.Adapter;
  12. using MQTTnet.AspNetCore;
  13. using MQTTnet.Client.Publishing;
  14. using MQTTnet.Implementations;
  15. using MQTTnet.Protocol;
  16. using MQTTnet.Server.Configuration;
  17. using MQTTnet.Server.Scripting;
  18. using MQTTnet.Server.Status;
  19. namespace MQTTnet.Server.Mqtt
  20. {
  21. public class MqttServerService
  22. {
  23. private readonly ILogger<MqttServerService> _logger;
  24. private readonly MqttSettingsModel _settings;
  25. private readonly MqttApplicationMessageInterceptor _mqttApplicationMessageInterceptor;
  26. private readonly MqttServerStorage _mqttServerStorage;
  27. private readonly MqttClientConnectedHandler _mqttClientConnectedHandler;
  28. private readonly MqttClientDisconnectedHandler _mqttClientDisconnectedHandler;
  29. private readonly MqttClientSubscribedTopicHandler _mqttClientSubscribedTopicHandler;
  30. private readonly MqttClientUnsubscribedTopicHandler _mqttClientUnsubscribedTopicHandler;
  31. private readonly MqttServerConnectionValidator _mqttConnectionValidator;
  32. private readonly IMqttServer _mqttServer;
  33. private readonly MqttSubscriptionInterceptor _mqttSubscriptionInterceptor;
  34. private readonly PythonScriptHostService _pythonScriptHostService;
  35. private readonly MqttWebSocketServerAdapter _webSocketServerAdapter;
  36. public MqttServerService(
  37. MqttSettingsModel mqttSettings,
  38. CustomMqttFactory mqttFactory,
  39. MqttClientConnectedHandler mqttClientConnectedHandler,
  40. MqttClientDisconnectedHandler mqttClientDisconnectedHandler,
  41. MqttClientSubscribedTopicHandler mqttClientSubscribedTopicHandler,
  42. MqttClientUnsubscribedTopicHandler mqttClientUnsubscribedTopicHandler,
  43. MqttServerConnectionValidator mqttConnectionValidator,
  44. MqttSubscriptionInterceptor mqttSubscriptionInterceptor,
  45. MqttApplicationMessageInterceptor mqttApplicationMessageInterceptor,
  46. MqttServerStorage mqttServerStorage,
  47. PythonScriptHostService pythonScriptHostService,
  48. ILogger<MqttServerService> logger)
  49. {
  50. _settings = mqttSettings ?? throw new ArgumentNullException(nameof(mqttSettings));
  51. _mqttClientConnectedHandler = mqttClientConnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientConnectedHandler));
  52. _mqttClientDisconnectedHandler = mqttClientDisconnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientDisconnectedHandler));
  53. _mqttClientSubscribedTopicHandler = mqttClientSubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientSubscribedTopicHandler));
  54. _mqttClientUnsubscribedTopicHandler = mqttClientUnsubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientUnsubscribedTopicHandler));
  55. _mqttConnectionValidator = mqttConnectionValidator ?? throw new ArgumentNullException(nameof(mqttConnectionValidator));
  56. _mqttSubscriptionInterceptor = mqttSubscriptionInterceptor ?? throw new ArgumentNullException(nameof(mqttSubscriptionInterceptor));
  57. _mqttApplicationMessageInterceptor = mqttApplicationMessageInterceptor ?? throw new ArgumentNullException(nameof(mqttApplicationMessageInterceptor));
  58. _mqttServerStorage = mqttServerStorage ?? throw new ArgumentNullException(nameof(mqttServerStorage));
  59. _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService));
  60. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  61. _webSocketServerAdapter = new MqttWebSocketServerAdapter(mqttFactory.Logger.CreateChildLogger());
  62. var adapters = new List<IMqttServerAdapter>
  63. {
  64. new MqttTcpServerAdapter(mqttFactory.Logger.CreateChildLogger())
  65. {
  66. TreatSocketOpeningErrorAsWarning = true // Opening other ports than for HTTP is not allows in Azure App Services.
  67. },
  68. _webSocketServerAdapter
  69. };
  70. _mqttServer = mqttFactory.CreateMqttServer(adapters);
  71. }
  72. public void Configure()
  73. {
  74. _pythonScriptHostService.RegisterProxyObject("publish", new Action<PythonDictionary>(Publish));
  75. _mqttServerStorage.Configure();
  76. _mqttServer.ClientConnectedHandler = _mqttClientConnectedHandler;
  77. _mqttServer.ClientDisconnectedHandler = _mqttClientDisconnectedHandler;
  78. _mqttServer.ClientSubscribedTopicHandler = _mqttClientSubscribedTopicHandler;
  79. _mqttServer.ClientUnsubscribedTopicHandler = _mqttClientUnsubscribedTopicHandler;
  80. _mqttServer.StartAsync(CreateMqttServerOptions()).GetAwaiter().GetResult();
  81. _logger.LogInformation("MQTT server started.");
  82. }
  83. public Task RunWebSocketConnectionAsync(WebSocket webSocket, HttpContext httpContext)
  84. {
  85. return _webSocketServerAdapter.RunWebSocketConnectionAsync(webSocket, httpContext);
  86. }
  87. public Task<IList<IMqttClientStatus>> GetClientStatusAsync()
  88. {
  89. return _mqttServer.GetClientStatusAsync();
  90. }
  91. public Task<IList<IMqttSessionStatus>> GetSessionStatusAsync()
  92. {
  93. return _mqttServer.GetSessionStatusAsync();
  94. }
  95. public Task ClearRetainedApplicationMessagesAsync()
  96. {
  97. return _mqttServer.ClearRetainedApplicationMessagesAsync();
  98. }
  99. public Task<IList<MqttApplicationMessage>> GetRetainedApplicationMessagesAsync()
  100. {
  101. return _mqttServer.GetRetainedApplicationMessagesAsync();
  102. }
  103. public Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage)
  104. {
  105. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  106. return _mqttServer.PublishAsync(applicationMessage);
  107. }
  108. private void Publish(PythonDictionary parameters)
  109. {
  110. try
  111. {
  112. var applicationMessageBuilder = new MqttApplicationMessageBuilder()
  113. .WithTopic((string)parameters.get("topic", null))
  114. .WithRetainFlag((bool)parameters.get("retain", false))
  115. .WithQualityOfServiceLevel((MqttQualityOfServiceLevel)(int)parameters.get("qos", 0));
  116. var payload = parameters.get("payload", null);
  117. byte[] binaryPayload;
  118. if (payload == null)
  119. {
  120. binaryPayload = new byte[0];
  121. }
  122. else if (payload is string stringPayload)
  123. {
  124. binaryPayload = Encoding.UTF8.GetBytes(stringPayload);
  125. }
  126. else if (payload is ByteArray byteArray)
  127. {
  128. binaryPayload = byteArray.ToArray();
  129. }
  130. else if (payload is IEnumerable<int> intArray)
  131. {
  132. binaryPayload = intArray.Select(Convert.ToByte).ToArray();
  133. }
  134. else
  135. {
  136. throw new NotSupportedException("Payload type not supported.");
  137. }
  138. applicationMessageBuilder = applicationMessageBuilder
  139. .WithPayload(binaryPayload);
  140. var applicationMessage = applicationMessageBuilder.Build();
  141. _mqttServer.PublishAsync(applicationMessage).GetAwaiter().GetResult();
  142. }
  143. catch (Exception exception)
  144. {
  145. _logger.LogError(exception, "Error while publishing application message from server.");
  146. }
  147. }
  148. private IMqttServerOptions CreateMqttServerOptions()
  149. {
  150. var options = new MqttServerOptionsBuilder()
  151. .WithMaxPendingMessagesPerClient(_settings.MaxPendingMessagesPerClient)
  152. .WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(_settings.CommunicationTimeout))
  153. .WithConnectionValidator(_mqttConnectionValidator)
  154. .WithApplicationMessageInterceptor(_mqttApplicationMessageInterceptor)
  155. .WithSubscriptionInterceptor(_mqttSubscriptionInterceptor)
  156. .WithStorage(_mqttServerStorage);
  157. // Configure unencrypted connections
  158. if (_settings.TcpEndPoint.Enabled)
  159. {
  160. options.WithDefaultEndpoint();
  161. if (_settings.TcpEndPoint.TryReadIPv4(out var address4))
  162. {
  163. options.WithDefaultEndpointBoundIPAddress(address4);
  164. }
  165. if (_settings.TcpEndPoint.TryReadIPv6(out var address6))
  166. {
  167. options.WithDefaultEndpointBoundIPV6Address(address6);
  168. }
  169. if (_settings.TcpEndPoint.Port > 0)
  170. {
  171. options.WithDefaultEndpointPort(_settings.TcpEndPoint.Port);
  172. }
  173. }
  174. else
  175. {
  176. options.WithoutDefaultEndpoint();
  177. }
  178. // Configure encrypted connections
  179. if (_settings.EncryptedTcpEndPoint.Enabled)
  180. {
  181. options
  182. .WithEncryptedEndpoint()
  183. .WithEncryptionSslProtocol(SslProtocols.Tls12);
  184. if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Path))
  185. {
  186. IMqttServerCertificateCredentials certificateCredentials = null;
  187. if (!string.IsNullOrEmpty(_settings.EncryptedTcpEndPoint?.Certificate?.Password))
  188. {
  189. certificateCredentials = new MqttServerCertificateCredentials
  190. {
  191. Password = _settings.EncryptedTcpEndPoint.Certificate.Password
  192. };
  193. }
  194. options.WithEncryptionCertificate(_settings.EncryptedTcpEndPoint.Certificate.ReadCertificate(), certificateCredentials);
  195. }
  196. if (_settings.EncryptedTcpEndPoint.TryReadIPv4(out var address4))
  197. {
  198. options.WithEncryptedEndpointBoundIPAddress(address4);
  199. }
  200. if (_settings.EncryptedTcpEndPoint.TryReadIPv6(out var address6))
  201. {
  202. options.WithEncryptedEndpointBoundIPV6Address(address6);
  203. }
  204. if (_settings.EncryptedTcpEndPoint.Port > 0)
  205. {
  206. options.WithEncryptedEndpointPort(_settings.EncryptedTcpEndPoint.Port);
  207. }
  208. }
  209. else
  210. {
  211. options.WithoutEncryptedEndpoint();
  212. }
  213. if (_settings.ConnectionBacklog > 0)
  214. {
  215. options.WithConnectionBacklog(_settings.ConnectionBacklog);
  216. }
  217. if (_settings.EnablePersistentSessions)
  218. {
  219. options.WithPersistentSessions();
  220. }
  221. return options.Build();
  222. }
  223. }
  224. }