You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

304 regels
11 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Core.Client;
  8. using MQTTnet.Core.Exceptions;
  9. using MQTTnet.Core.Protocol;
  10. using Microsoft.Extensions.Logging;
  11. namespace MQTTnet.Core.ManagedClient
  12. {
  13. public class ManagedMqttClient : IManagedMqttClient
  14. {
  15. private readonly ManagedMqttClientStorageManager _storageManager = new ManagedMqttClientStorageManager();
  16. private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>();
  17. private readonly HashSet<TopicFilter> _subscriptions = new HashSet<TopicFilter>();
  18. private readonly IMqttClient _mqttClient;
  19. private readonly ILogger<ManagedMqttClient> _logger;
  20. private CancellationTokenSource _connectionCancellationToken;
  21. private CancellationTokenSource _publishingCancellationToken;
  22. private IManagedMqttClientOptions _options;
  23. private bool _subscriptionsNotPushed;
  24. public ManagedMqttClient(ILogger<ManagedMqttClient> logger, IMqttClient mqttClient)
  25. {
  26. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  27. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  28. _mqttClient.Connected += OnConnected;
  29. _mqttClient.Disconnected += OnDisconnected;
  30. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  31. }
  32. public bool IsConnected => _mqttClient.IsConnected;
  33. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  34. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  35. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  36. public async Task StartAsync(IManagedMqttClientOptions options)
  37. {
  38. if (options == null) throw new ArgumentNullException(nameof(options));
  39. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  40. if (!options.ClientOptions.CleanSession)
  41. {
  42. throw new NotSupportedException("The managed client does not support existing sessions.");
  43. }
  44. if (_connectionCancellationToken != null)
  45. {
  46. throw new InvalidOperationException("The managed client is already started.");
  47. }
  48. _options = options;
  49. await _storageManager.SetStorageAsync(_options.Storage).ConfigureAwait(false);
  50. if (_options.Storage != null)
  51. {
  52. var loadedMessages = await _options.Storage.LoadQueuedMessagesAsync().ConfigureAwait(false);
  53. foreach (var loadedMessage in loadedMessages)
  54. {
  55. _messageQueue.Add(loadedMessage);
  56. }
  57. }
  58. _connectionCancellationToken = new CancellationTokenSource();
  59. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  60. Task.Factory.StartNew(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
  61. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  62. _logger.LogInformation("Started");
  63. }
  64. public Task StopAsync()
  65. {
  66. _connectionCancellationToken?.Cancel(false);
  67. _connectionCancellationToken = null;
  68. while (_messageQueue.Any())
  69. {
  70. _messageQueue.Take();
  71. }
  72. return Task.FromResult(0);
  73. }
  74. public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
  75. {
  76. if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));
  77. foreach (var applicationMessage in applicationMessages)
  78. {
  79. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  80. _messageQueue.Add(applicationMessage);
  81. }
  82. }
  83. public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  84. {
  85. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  86. lock (_subscriptions)
  87. {
  88. foreach (var topicFilter in topicFilters)
  89. {
  90. if (_subscriptions.Add(topicFilter))
  91. {
  92. _subscriptionsNotPushed = true;
  93. }
  94. }
  95. }
  96. return Task.FromResult(0);
  97. }
  98. public Task UnsubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  99. {
  100. lock (_subscriptions)
  101. {
  102. foreach (var topicFilter in topicFilters)
  103. {
  104. if (_subscriptions.Remove(topicFilter))
  105. {
  106. _subscriptionsNotPushed = true;
  107. }
  108. }
  109. }
  110. return Task.FromResult(0);
  111. }
  112. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  113. {
  114. try
  115. {
  116. while (!cancellationToken.IsCancellationRequested)
  117. {
  118. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  119. if (connectionState == ReconnectionResult.NotConnected)
  120. {
  121. _publishingCancellationToken?.Cancel(false);
  122. _publishingCancellationToken = null;
  123. await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  124. continue;
  125. }
  126. if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
  127. {
  128. await PushSubscriptionsAsync();
  129. _publishingCancellationToken = new CancellationTokenSource();
  130. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  131. Task.Factory.StartNew(() => PublishQueuedMessagesAsync(_publishingCancellationToken.Token), _publishingCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
  132. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  133. continue;
  134. }
  135. if (connectionState == ReconnectionResult.StillConnected)
  136. {
  137. await Task.Delay(100, _connectionCancellationToken.Token).ConfigureAwait(false); // Consider using the _Disconnected_ event here. (TaskCompletionSource)
  138. }
  139. }
  140. }
  141. catch (OperationCanceledException)
  142. {
  143. }
  144. catch (MqttCommunicationException exception)
  145. {
  146. _logger.LogWarning(new EventId(), exception, "Communication exception while maintaining connection.");
  147. }
  148. catch (Exception exception)
  149. {
  150. _logger.LogError(new EventId(), exception, "Unhandled exception while maintaining connection.");
  151. }
  152. finally
  153. {
  154. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  155. _logger.LogInformation("Stopped");
  156. }
  157. }
  158. private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  159. {
  160. try
  161. {
  162. while (!cancellationToken.IsCancellationRequested)
  163. {
  164. var message = _messageQueue.Take(cancellationToken);
  165. if (message == null)
  166. {
  167. continue;
  168. }
  169. if (cancellationToken.IsCancellationRequested)
  170. {
  171. continue;
  172. }
  173. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  174. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  175. }
  176. }
  177. catch (OperationCanceledException)
  178. {
  179. }
  180. finally
  181. {
  182. _logger.LogInformation("Stopped publishing messages");
  183. }
  184. }
  185. private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message)
  186. {
  187. try
  188. {
  189. await _mqttClient.PublishAsync(message).ConfigureAwait(false);
  190. }
  191. catch (MqttCommunicationException exception)
  192. {
  193. _logger.LogWarning(new EventId(), exception, "Publishing application message failed.");
  194. if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  195. {
  196. _messageQueue.Add(message);
  197. }
  198. }
  199. catch (Exception exception)
  200. {
  201. _logger.LogError(new EventId(), exception, "Unhandled exception while publishing queued application message.");
  202. }
  203. }
  204. private async Task PushSubscriptionsAsync()
  205. {
  206. _logger.LogInformation(nameof(ManagedMqttClient), "Synchronizing subscriptions");
  207. List<TopicFilter> subscriptions;
  208. lock (_subscriptions)
  209. {
  210. subscriptions = _subscriptions.ToList();
  211. _subscriptionsNotPushed = false;
  212. }
  213. if (!_subscriptions.Any())
  214. {
  215. return;
  216. }
  217. try
  218. {
  219. await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
  220. }
  221. catch (Exception exception)
  222. {
  223. _logger.LogWarning(new EventId(), exception, "Synchronizing subscriptions failed");
  224. _subscriptionsNotPushed = true;
  225. }
  226. }
  227. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  228. {
  229. if (_mqttClient.IsConnected)
  230. {
  231. return ReconnectionResult.StillConnected;
  232. }
  233. try
  234. {
  235. await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
  236. return ReconnectionResult.Reconnected;
  237. }
  238. catch (Exception)
  239. {
  240. return ReconnectionResult.NotConnected;
  241. }
  242. }
  243. private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  244. {
  245. ApplicationMessageReceived?.Invoke(this, eventArgs);
  246. }
  247. private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
  248. {
  249. Disconnected?.Invoke(this, eventArgs);
  250. }
  251. private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs)
  252. {
  253. Connected?.Invoke(this, eventArgs);
  254. }
  255. }
  256. }