Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 
 

323 rindas
12 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Client;
  8. using MQTTnet.Diagnostics;
  9. using MQTTnet.Exceptions;
  10. using MQTTnet.Protocol;
  11. namespace MQTTnet.ManagedClient
  12. {
  13. public class ManagedMqttClient : IManagedMqttClient
  14. {
  15. private readonly ManagedMqttClientStorageManager _storageManager = new ManagedMqttClientStorageManager();
  16. private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>();
  17. private readonly HashSet<TopicFilter> _subscriptions = new HashSet<TopicFilter>();
  18. private readonly IMqttClient _mqttClient;
  19. private readonly IMqttNetLogger _logger;
  20. private CancellationTokenSource _connectionCancellationToken;
  21. private CancellationTokenSource _publishingCancellationToken;
  22. private IManagedMqttClientOptions _options;
  23. private bool _subscriptionsNotPushed;
  24. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
  25. {
  26. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  27. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  28. _mqttClient.Connected += OnConnected;
  29. _mqttClient.Disconnected += OnDisconnected;
  30. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  31. }
  32. public bool IsConnected => _mqttClient.IsConnected;
  33. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  34. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  35. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  36. public async Task StartAsync(IManagedMqttClientOptions options)
  37. {
  38. if (options == null) throw new ArgumentNullException(nameof(options));
  39. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  40. if (!options.ClientOptions.CleanSession)
  41. {
  42. throw new NotSupportedException("The managed client does not support existing sessions.");
  43. }
  44. if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");
  45. _options = options;
  46. await _storageManager.SetStorageAsync(_options.Storage).ConfigureAwait(false);
  47. if (_options.Storage != null)
  48. {
  49. var loadedMessages = await _options.Storage.LoadQueuedMessagesAsync().ConfigureAwait(false);
  50. foreach (var loadedMessage in loadedMessages)
  51. {
  52. _messageQueue.Add(loadedMessage);
  53. }
  54. }
  55. _connectionCancellationToken = new CancellationTokenSource();
  56. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  57. Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token).ConfigureAwait(false);
  58. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  59. _logger.Info<ManagedMqttClient>("Started");
  60. }
  61. public Task StopAsync()
  62. {
  63. _connectionCancellationToken?.Cancel(false);
  64. _connectionCancellationToken = null;
  65. _publishingCancellationToken?.Cancel(false);
  66. _publishingCancellationToken = null;
  67. while (_messageQueue.Any())
  68. {
  69. _messageQueue.Take();
  70. }
  71. return Task.FromResult(0);
  72. }
  73. public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
  74. {
  75. if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));
  76. foreach (var applicationMessage in applicationMessages)
  77. {
  78. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  79. _messageQueue.Add(applicationMessage);
  80. }
  81. }
  82. public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  83. {
  84. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  85. lock (_subscriptions)
  86. {
  87. foreach (var topicFilter in topicFilters)
  88. {
  89. if (_subscriptions.Add(topicFilter))
  90. {
  91. _subscriptionsNotPushed = true;
  92. }
  93. }
  94. }
  95. return Task.FromResult(0);
  96. }
  97. public Task UnsubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  98. {
  99. lock (_subscriptions)
  100. {
  101. foreach (var topicFilter in topicFilters)
  102. {
  103. if (_subscriptions.Remove(topicFilter))
  104. {
  105. _subscriptionsNotPushed = true;
  106. }
  107. }
  108. }
  109. return Task.FromResult(0);
  110. }
  111. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  112. {
  113. try
  114. {
  115. while (!cancellationToken.IsCancellationRequested)
  116. {
  117. await TryMaintainConnectionAsync(cancellationToken);
  118. }
  119. }
  120. catch (OperationCanceledException)
  121. {
  122. }
  123. catch (Exception exception)
  124. {
  125. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while maintaining connection.");
  126. }
  127. finally
  128. {
  129. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  130. _logger.Info<ManagedMqttClient>("Stopped");
  131. }
  132. }
  133. private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  134. {
  135. try
  136. {
  137. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  138. if (connectionState == ReconnectionResult.NotConnected)
  139. {
  140. _publishingCancellationToken?.Cancel(false);
  141. _publishingCancellationToken = null;
  142. await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  143. return;
  144. }
  145. if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
  146. {
  147. await PushSubscriptionsAsync().ConfigureAwait(false);
  148. _publishingCancellationToken = new CancellationTokenSource();
  149. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  150. Task.Run(async () => await PublishQueuedMessagesAsync(_publishingCancellationToken.Token).ConfigureAwait(false), _publishingCancellationToken.Token).ConfigureAwait(false);
  151. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  152. return;
  153. }
  154. if (connectionState == ReconnectionResult.StillConnected)
  155. {
  156. await Task.Delay(TimeSpan.FromSeconds(1), _connectionCancellationToken.Token).ConfigureAwait(false);
  157. }
  158. }
  159. catch (OperationCanceledException)
  160. {
  161. }
  162. catch (MqttCommunicationException exception)
  163. {
  164. _logger.Warning<ManagedMqttClient>(exception, "Communication exception while maintaining connection.");
  165. }
  166. catch (Exception exception)
  167. {
  168. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while maintaining connection.");
  169. }
  170. }
  171. private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  172. {
  173. try
  174. {
  175. while (!cancellationToken.IsCancellationRequested)
  176. {
  177. var message = _messageQueue.Take(cancellationToken);
  178. if (message == null)
  179. {
  180. continue;
  181. }
  182. if (cancellationToken.IsCancellationRequested)
  183. {
  184. continue;
  185. }
  186. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  187. }
  188. }
  189. catch (OperationCanceledException)
  190. {
  191. }
  192. catch (Exception exception)
  193. {
  194. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while publishing queued application messages.");
  195. }
  196. finally
  197. {
  198. _logger.Trace<ManagedMqttClient>("Stopped publishing messages.");
  199. }
  200. }
  201. private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message)
  202. {
  203. try
  204. {
  205. await _mqttClient.PublishAsync(message).ConfigureAwait(false);
  206. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  207. }
  208. catch (MqttCommunicationException exception)
  209. {
  210. _logger.Warning<ManagedMqttClient>(exception, "Publishing application message failed.");
  211. if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  212. {
  213. _messageQueue.Add(message);
  214. }
  215. }
  216. catch (Exception exception)
  217. {
  218. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while publishing queued application message.");
  219. }
  220. }
  221. private async Task PushSubscriptionsAsync()
  222. {
  223. _logger.Info<ManagedMqttClient>(nameof(ManagedMqttClient), "Synchronizing subscriptions");
  224. List<TopicFilter> subscriptions;
  225. lock (_subscriptions)
  226. {
  227. subscriptions = _subscriptions.ToList();
  228. _subscriptionsNotPushed = false;
  229. }
  230. if (!_subscriptions.Any())
  231. {
  232. return;
  233. }
  234. try
  235. {
  236. await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
  237. }
  238. catch (Exception exception)
  239. {
  240. _logger.Warning<ManagedMqttClient>(exception, "Synchronizing subscriptions failed");
  241. _subscriptionsNotPushed = true;
  242. }
  243. }
  244. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  245. {
  246. if (_mqttClient.IsConnected)
  247. {
  248. return ReconnectionResult.StillConnected;
  249. }
  250. try
  251. {
  252. await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
  253. return ReconnectionResult.Reconnected;
  254. }
  255. catch (Exception)
  256. {
  257. return ReconnectionResult.NotConnected;
  258. }
  259. }
  260. private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  261. {
  262. ApplicationMessageReceived?.Invoke(this, eventArgs);
  263. }
  264. private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
  265. {
  266. Disconnected?.Invoke(this, eventArgs);
  267. }
  268. private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs)
  269. {
  270. Connected?.Invoke(this, eventArgs);
  271. }
  272. }
  273. }