Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.
 
 
 
 

391 рядки
14 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Client;
  8. using MQTTnet.Diagnostics;
  9. using MQTTnet.Exceptions;
  10. using MQTTnet.Protocol;
  11. namespace MQTTnet.Extensions.ManagedClient
  12. {
  13. public class ManagedMqttClient : IManagedMqttClient
  14. {
  15. private readonly BlockingCollection<ManagedMqttApplicationMessage> _messageQueue = new BlockingCollection<ManagedMqttApplicationMessage>();
  16. private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  17. private readonly HashSet<string> _unsubscriptions = new HashSet<string>();
  18. private readonly IMqttClient _mqttClient;
  19. private readonly IMqttNetChildLogger _logger;
  20. private CancellationTokenSource _connectionCancellationToken;
  21. private CancellationTokenSource _publishingCancellationToken;
  22. private ManagedMqttClientStorageManager _storageManager;
  23. private IManagedMqttClientOptions _options;
  24. private bool _subscriptionsNotPushed;
  25. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
  26. {
  27. if (logger == null) throw new ArgumentNullException(nameof(logger));
  28. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  29. _mqttClient.Connected += OnConnected;
  30. _mqttClient.Disconnected += OnDisconnected;
  31. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  32. _logger = logger.CreateChildLogger(nameof(ManagedMqttClient));
  33. }
  34. public bool IsConnected => _mqttClient.IsConnected;
  35. public bool IsStarted => _connectionCancellationToken != null;
  36. public int PendingApplicationMessagesCount => _messageQueue.Count;
  37. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  38. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  39. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  40. public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
  41. public event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed;
  42. public event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed;
  43. public async Task StartAsync(IManagedMqttClientOptions options)
  44. {
  45. if (options == null) throw new ArgumentNullException(nameof(options));
  46. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  47. if (!options.ClientOptions.CleanSession)
  48. {
  49. throw new NotSupportedException("The managed client does not support existing sessions.");
  50. }
  51. if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");
  52. _options = options;
  53. if (_options.Storage != null)
  54. {
  55. _storageManager = new ManagedMqttClientStorageManager(_options.Storage);
  56. var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  57. foreach (var message in messages)
  58. {
  59. _messageQueue.Add(message);
  60. }
  61. }
  62. _connectionCancellationToken = new CancellationTokenSource();
  63. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  64. Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token);
  65. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  66. _logger.Info("Started");
  67. }
  68. public Task StopAsync()
  69. {
  70. StopPublishing();
  71. StopMaintainingConnection();
  72. while (_messageQueue.Any())
  73. {
  74. _messageQueue.Take();
  75. }
  76. return Task.FromResult(0);
  77. }
  78. public Task PublishAsync(MqttApplicationMessage applicationMessage)
  79. {
  80. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  81. return PublishAsync(new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage).Build());
  82. }
  83. public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
  84. {
  85. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  86. if (_storageManager != null)
  87. {
  88. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  89. }
  90. _messageQueue.Add(applicationMessage);
  91. }
  92. public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  93. {
  94. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  95. lock (_subscriptions)
  96. {
  97. foreach (var topicFilter in topicFilters)
  98. {
  99. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  100. _subscriptionsNotPushed = true;
  101. }
  102. }
  103. return Task.FromResult(0);
  104. }
  105. public Task UnsubscribeAsync(IEnumerable<string> topics)
  106. {
  107. if (topics == null) throw new ArgumentNullException(nameof(topics));
  108. lock (_subscriptions)
  109. {
  110. foreach (var topic in topics)
  111. {
  112. if (_subscriptions.Remove(topic))
  113. {
  114. _unsubscriptions.Add(topic);
  115. _subscriptionsNotPushed = true;
  116. }
  117. }
  118. }
  119. return Task.FromResult(0);
  120. }
  121. public void Dispose()
  122. {
  123. _messageQueue?.Dispose();
  124. _connectionCancellationToken?.Dispose();
  125. _publishingCancellationToken?.Dispose();
  126. }
  127. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  128. {
  129. try
  130. {
  131. while (!cancellationToken.IsCancellationRequested)
  132. {
  133. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  134. }
  135. }
  136. catch (OperationCanceledException)
  137. {
  138. }
  139. catch (Exception exception)
  140. {
  141. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  142. }
  143. finally
  144. {
  145. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  146. _logger.Info("Stopped");
  147. }
  148. }
  149. private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  150. {
  151. try
  152. {
  153. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  154. if (connectionState == ReconnectionResult.NotConnected)
  155. {
  156. StopPublishing();
  157. await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  158. return;
  159. }
  160. if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
  161. {
  162. await SynchronizeSubscriptionsAsync().ConfigureAwait(false);
  163. StartPublishing();
  164. return;
  165. }
  166. if (connectionState == ReconnectionResult.StillConnected)
  167. {
  168. await Task.Delay(_options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
  169. }
  170. }
  171. catch (OperationCanceledException)
  172. {
  173. }
  174. catch (MqttCommunicationException exception)
  175. {
  176. _logger.Warning(exception, "Communication exception while maintaining connection.");
  177. }
  178. catch (Exception exception)
  179. {
  180. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  181. }
  182. }
  183. private void PublishQueuedMessages(CancellationToken cancellationToken)
  184. {
  185. try
  186. {
  187. while (!cancellationToken.IsCancellationRequested)
  188. {
  189. var message = _messageQueue.Take(cancellationToken);
  190. if (message == null)
  191. {
  192. continue;
  193. }
  194. cancellationToken.ThrowIfCancellationRequested();
  195. TryPublishQueuedMessage(message);
  196. }
  197. }
  198. catch (OperationCanceledException)
  199. {
  200. }
  201. catch (Exception exception)
  202. {
  203. _logger.Error(exception, "Unhandled exception while publishing queued application messages.");
  204. }
  205. finally
  206. {
  207. _logger.Verbose("Stopped publishing messages.");
  208. }
  209. }
  210. private void TryPublishQueuedMessage(ManagedMqttApplicationMessage message)
  211. {
  212. Exception transmitException = null;
  213. try
  214. {
  215. _mqttClient.PublishAsync(message.ApplicationMessage).GetAwaiter().GetResult();
  216. _storageManager?.RemoveAsync(message).GetAwaiter().GetResult();
  217. }
  218. catch (MqttCommunicationException exception)
  219. {
  220. transmitException = exception;
  221. _logger.Warning(exception, $"Publishing application ({message.Id}) message failed.");
  222. if (message.ApplicationMessage.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  223. {
  224. _messageQueue.Add(message);
  225. }
  226. }
  227. catch (Exception exception)
  228. {
  229. transmitException = exception;
  230. _logger.Error(exception, $"Unhandled exception while publishing application message ({message.Id}).");
  231. }
  232. finally
  233. {
  234. ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException));
  235. }
  236. }
  237. private async Task SynchronizeSubscriptionsAsync()
  238. {
  239. _logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions");
  240. List<TopicFilter> subscriptions;
  241. HashSet<string> unsubscriptions;
  242. lock (_subscriptions)
  243. {
  244. subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList();
  245. unsubscriptions = new HashSet<string>(_unsubscriptions);
  246. _unsubscriptions.Clear();
  247. _subscriptionsNotPushed = false;
  248. }
  249. if (!subscriptions.Any() && !unsubscriptions.Any())
  250. {
  251. return;
  252. }
  253. try
  254. {
  255. if (subscriptions.Any())
  256. {
  257. await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
  258. }
  259. if (unsubscriptions.Any())
  260. {
  261. await _mqttClient.UnsubscribeAsync(unsubscriptions).ConfigureAwait(false);
  262. }
  263. }
  264. catch (Exception exception)
  265. {
  266. _logger.Warning(exception, "Synchronizing subscriptions failed.");
  267. _subscriptionsNotPushed = true;
  268. SynchronizingSubscriptionsFailed?.Invoke(this, new MqttManagedProcessFailedEventArgs(exception));
  269. }
  270. }
  271. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  272. {
  273. if (_mqttClient.IsConnected)
  274. {
  275. return ReconnectionResult.StillConnected;
  276. }
  277. try
  278. {
  279. await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
  280. return ReconnectionResult.Reconnected;
  281. }
  282. catch (Exception exception)
  283. {
  284. ConnectingFailed?.Invoke(this, new MqttManagedProcessFailedEventArgs(exception));
  285. return ReconnectionResult.NotConnected;
  286. }
  287. }
  288. private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  289. {
  290. ApplicationMessageReceived?.Invoke(this, eventArgs);
  291. }
  292. private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
  293. {
  294. Disconnected?.Invoke(this, eventArgs);
  295. }
  296. private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs)
  297. {
  298. Connected?.Invoke(this, eventArgs);
  299. }
  300. private void StartPublishing()
  301. {
  302. if (_publishingCancellationToken != null)
  303. {
  304. StopPublishing();
  305. }
  306. var cts = new CancellationTokenSource();
  307. _publishingCancellationToken = cts;
  308. Task.Factory.StartNew(() => PublishQueuedMessages(cts.Token), cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
  309. }
  310. private void StopPublishing()
  311. {
  312. _publishingCancellationToken?.Cancel(false);
  313. _publishingCancellationToken?.Dispose();
  314. _publishingCancellationToken = null;
  315. }
  316. private void StopMaintainingConnection()
  317. {
  318. _connectionCancellationToken?.Cancel(false);
  319. _connectionCancellationToken?.Dispose();
  320. _connectionCancellationToken = null;
  321. }
  322. }
  323. }