您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

ManagedMqttClient.cs 17 KiB

6 年前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Client;
  7. using MQTTnet.Diagnostics;
  8. using MQTTnet.Exceptions;
  9. using MQTTnet.Internal;
  10. using MQTTnet.Protocol;
  11. using MQTTnet.Server;
  12. using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
  13. using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
  14. namespace MQTTnet.Extensions.ManagedClient
  15. {
  16. public class ManagedMqttClient : IManagedMqttClient
  17. {
  18. private readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();
  19. private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  20. private readonly HashSet<string> _unsubscriptions = new HashSet<string>();
  21. private readonly IMqttClient _mqttClient;
  22. private readonly IMqttNetChildLogger _logger;
  23. private CancellationTokenSource _connectionCancellationToken;
  24. private CancellationTokenSource _publishingCancellationToken;
  25. private ManagedMqttClientStorageManager _storageManager;
  26. private IManagedMqttClientOptions _options;
  27. private bool _subscriptionsNotPushed;
  28. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
  29. {
  30. if (logger == null) throw new ArgumentNullException(nameof(logger));
  31. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  32. _mqttClient.Connected += OnConnected;
  33. _mqttClient.Disconnected += OnDisconnected;
  34. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  35. _logger = logger.CreateChildLogger(nameof(ManagedMqttClient));
  36. }
  37. public bool IsConnected => _mqttClient.IsConnected;
  38. public bool IsStarted => _connectionCancellationToken != null;
  39. public int PendingApplicationMessagesCount => _messageQueue.Count;
  40. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  41. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  42. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  43. public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
  44. public event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped;
  45. public event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed;
  46. public event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed;
  47. public async Task StartAsync(IManagedMqttClientOptions options)
  48. {
  49. if (options == null) throw new ArgumentNullException(nameof(options));
  50. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  51. if (!options.ClientOptions.CleanSession)
  52. {
  53. throw new NotSupportedException("The managed client does not support existing sessions.");
  54. }
  55. if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");
  56. _options = options;
  57. if (_options.Storage != null)
  58. {
  59. _storageManager = new ManagedMqttClientStorageManager(_options.Storage);
  60. var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  61. foreach (var message in messages)
  62. {
  63. _messageQueue.Enqueue(message);
  64. }
  65. }
  66. _connectionCancellationToken = new CancellationTokenSource();
  67. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  68. Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token);
  69. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  70. _logger.Info("Started");
  71. }
  72. public Task StopAsync()
  73. {
  74. StopPublishing();
  75. StopMaintainingConnection();
  76. _messageQueue.Clear();
  77. return Task.FromResult(0);
  78. }
  79. public Task PublishAsync(MqttApplicationMessage applicationMessage)
  80. {
  81. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  82. return PublishAsync(new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage).Build());
  83. }
  84. public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
  85. {
  86. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  87. ManagedMqttApplicationMessage removedMessage = null;
  88. lock (_messageQueue)
  89. {
  90. if (_messageQueue.Count >= _options.MaxPendingMessages)
  91. {
  92. if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
  93. {
  94. _logger.Verbose("Skipping publish of new application message because internal queue is full.");
  95. ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(applicationMessage));
  96. return;
  97. }
  98. if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
  99. {
  100. removedMessage = _messageQueue.RemoveFirst();
  101. _logger.Verbose("Removed oldest application message from internal queue because it is full.");
  102. ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(removedMessage));
  103. }
  104. }
  105. _messageQueue.Enqueue(applicationMessage);
  106. }
  107. if (_storageManager != null)
  108. {
  109. if (removedMessage != null)
  110. {
  111. await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
  112. }
  113. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  114. }
  115. }
  116. public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  117. {
  118. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  119. lock (_subscriptions)
  120. {
  121. foreach (var topicFilter in topicFilters)
  122. {
  123. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  124. _subscriptionsNotPushed = true;
  125. }
  126. }
  127. return Task.FromResult(0);
  128. }
  129. public Task UnsubscribeAsync(IEnumerable<string> topics)
  130. {
  131. if (topics == null) throw new ArgumentNullException(nameof(topics));
  132. lock (_subscriptions)
  133. {
  134. foreach (var topic in topics)
  135. {
  136. if (_subscriptions.Remove(topic))
  137. {
  138. _unsubscriptions.Add(topic);
  139. _subscriptionsNotPushed = true;
  140. }
  141. }
  142. }
  143. return Task.FromResult(0);
  144. }
  145. public void Dispose()
  146. {
  147. _connectionCancellationToken?.Dispose();
  148. _publishingCancellationToken?.Dispose();
  149. }
  150. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  151. {
  152. try
  153. {
  154. while (!cancellationToken.IsCancellationRequested)
  155. {
  156. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  157. }
  158. }
  159. catch (OperationCanceledException)
  160. {
  161. }
  162. catch (Exception exception)
  163. {
  164. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  165. }
  166. finally
  167. {
  168. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  169. _logger.Info("Stopped");
  170. }
  171. }
  172. private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  173. {
  174. try
  175. {
  176. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  177. if (connectionState == ReconnectionResult.NotConnected)
  178. {
  179. StopPublishing();
  180. await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  181. return;
  182. }
  183. if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
  184. {
  185. await SynchronizeSubscriptionsAsync().ConfigureAwait(false);
  186. StartPublishing();
  187. return;
  188. }
  189. if (connectionState == ReconnectionResult.StillConnected)
  190. {
  191. await Task.Delay(_options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
  192. }
  193. }
  194. catch (OperationCanceledException)
  195. {
  196. }
  197. catch (MqttCommunicationException exception)
  198. {
  199. _logger.Warning(exception, "Communication exception while maintaining connection.");
  200. }
  201. catch (Exception exception)
  202. {
  203. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  204. }
  205. }
  206. private void PublishQueuedMessages(CancellationToken cancellationToken)
  207. {
  208. try
  209. {
  210. while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected)
  211. {
  212. //Peek at the message without dequeueing in order to prevent the
  213. //possibility of the queue growing beyond the configured cap.
  214. //Previously, messages could be re-enqueued if there was an
  215. //exception, and this re-enqueueing did not honor the cap.
  216. //Furthermore, because re-enqueueing would shuffle the order
  217. //of the messages, the DropOldestQueuedMessage strategy would
  218. //be unable to know which message is actually the oldest and would
  219. //instead drop the first item in the queue.
  220. var message = _messageQueue.PeekAndWait();
  221. if (message == null)
  222. {
  223. continue;
  224. }
  225. cancellationToken.ThrowIfCancellationRequested();
  226. TryPublishQueuedMessage(message);
  227. }
  228. }
  229. catch (OperationCanceledException)
  230. {
  231. }
  232. catch (Exception exception)
  233. {
  234. _logger.Error(exception, "Unhandled exception while publishing queued application messages.");
  235. }
  236. finally
  237. {
  238. _logger.Verbose("Stopped publishing messages.");
  239. }
  240. }
  241. private void TryPublishQueuedMessage(ManagedMqttApplicationMessage message)
  242. {
  243. Exception transmitException = null;
  244. try
  245. {
  246. _mqttClient.PublishAsync(message.ApplicationMessage).GetAwaiter().GetResult();
  247. lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
  248. {
  249. //While publishing this message, this.PublishAsync could have booted this
  250. //message off the queue to make room for another (when using a cap
  251. //with the DropOldestQueuedMessage strategy). If the first item
  252. //in the queue is equal to this message, then it's safe to remove
  253. //it from the queue. If not, that means this.PublishAsync has already
  254. //removed it, in which case we don't want to do anything.
  255. _messageQueue.RemoveFirstIfEqual(message);
  256. }
  257. _storageManager?.RemoveAsync(message).GetAwaiter().GetResult();
  258. }
  259. catch (MqttCommunicationException exception)
  260. {
  261. transmitException = exception;
  262. _logger.Warning(exception, $"Publishing application ({message.Id}) message failed.");
  263. if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  264. {
  265. //If QoS 0, we don't want this message to stay on the queue.
  266. //If QoS 1 or 2, it's possible that, when using a cap, this message
  267. //has been booted off the queue by this.PublishAsync, in which case this
  268. //thread will not continue to try to publish it. While this does
  269. //contradict the expected behavior of QoS 1 and 2, that's also true
  270. //for the usage of a message queue cap, so it's still consistent
  271. //with prior behavior in that way.
  272. lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
  273. {
  274. _messageQueue.RemoveFirstIfEqual(message);
  275. }
  276. }
  277. }
  278. catch (Exception exception)
  279. {
  280. transmitException = exception;
  281. _logger.Error(exception, $"Unhandled exception while publishing application message ({message.Id}).");
  282. }
  283. finally
  284. {
  285. ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException));
  286. }
  287. }
  288. private async Task SynchronizeSubscriptionsAsync()
  289. {
  290. _logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions");
  291. List<TopicFilter> subscriptions;
  292. HashSet<string> unsubscriptions;
  293. lock (_subscriptions)
  294. {
  295. subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList();
  296. unsubscriptions = new HashSet<string>(_unsubscriptions);
  297. _unsubscriptions.Clear();
  298. _subscriptionsNotPushed = false;
  299. }
  300. if (!subscriptions.Any() && !unsubscriptions.Any())
  301. {
  302. return;
  303. }
  304. try
  305. {
  306. if (unsubscriptions.Any())
  307. {
  308. await _mqttClient.UnsubscribeAsync(unsubscriptions).ConfigureAwait(false);
  309. }
  310. if (subscriptions.Any())
  311. {
  312. await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
  313. }
  314. }
  315. catch (Exception exception)
  316. {
  317. _logger.Warning(exception, "Synchronizing subscriptions failed.");
  318. _subscriptionsNotPushed = true;
  319. SynchronizingSubscriptionsFailed?.Invoke(this, new MqttManagedProcessFailedEventArgs(exception));
  320. }
  321. }
  322. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  323. {
  324. if (_mqttClient.IsConnected)
  325. {
  326. return ReconnectionResult.StillConnected;
  327. }
  328. try
  329. {
  330. await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
  331. return ReconnectionResult.Reconnected;
  332. }
  333. catch (Exception exception)
  334. {
  335. ConnectingFailed?.Invoke(this, new MqttManagedProcessFailedEventArgs(exception));
  336. return ReconnectionResult.NotConnected;
  337. }
  338. }
  339. private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  340. {
  341. ApplicationMessageReceived?.Invoke(this, eventArgs);
  342. }
  343. private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
  344. {
  345. Disconnected?.Invoke(this, eventArgs);
  346. }
  347. private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs)
  348. {
  349. Connected?.Invoke(this, eventArgs);
  350. }
  351. private void StartPublishing()
  352. {
  353. if (_publishingCancellationToken != null)
  354. {
  355. StopPublishing();
  356. }
  357. var cts = new CancellationTokenSource();
  358. _publishingCancellationToken = cts;
  359. Task.Factory.StartNew(() => PublishQueuedMessages(cts.Token), cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
  360. }
  361. private void StopPublishing()
  362. {
  363. _publishingCancellationToken?.Cancel(false);
  364. _publishingCancellationToken?.Dispose();
  365. _publishingCancellationToken = null;
  366. }
  367. private void StopMaintainingConnection()
  368. {
  369. _connectionCancellationToken?.Cancel(false);
  370. _connectionCancellationToken?.Dispose();
  371. _connectionCancellationToken = null;
  372. }
  373. }
  374. }