You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ManagedMqttClient.cs 20 KiB

7 years ago
6 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Client;
  7. using MQTTnet.Client.Connecting;
  8. using MQTTnet.Client.Disconnecting;
  9. using MQTTnet.Client.Publishing;
  10. using MQTTnet.Client.Receiving;
  11. using MQTTnet.Diagnostics;
  12. using MQTTnet.Exceptions;
  13. using MQTTnet.Internal;
  14. using MQTTnet.Protocol;
  15. using MQTTnet.Server;
  16. namespace MQTTnet.Extensions.ManagedClient
  17. {
  18. public class ManagedMqttClient : IManagedMqttClient
  19. {
  20. private readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();
  21. private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  22. private readonly HashSet<string> _unsubscriptions = new HashSet<string>();
  23. private readonly IMqttClient _mqttClient;
  24. private readonly IMqttNetChildLogger _logger;
  25. private readonly AsyncLock _messageQueueLock = new AsyncLock();
  26. private CancellationTokenSource _connectionCancellationToken;
  27. private CancellationTokenSource _publishingCancellationToken;
  28. private Task _maintainConnectionTask;
  29. private ManagedMqttClientStorageManager _storageManager;
  30. private bool _disposed;
  31. private bool _subscriptionsNotPushed;
  32. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
  33. {
  34. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  35. if (logger == null) throw new ArgumentNullException(nameof(logger));
  36. _logger = logger.CreateChildLogger(nameof(ManagedMqttClient));
  37. }
  38. public bool IsConnected => _mqttClient.IsConnected;
  39. public bool IsStarted => _connectionCancellationToken != null;
  40. public int PendingApplicationMessagesCount => _messageQueue.Count;
  41. public IManagedMqttClientOptions Options { get; private set; }
  42. public IMqttClientConnectedHandler ConnectedHandler
  43. {
  44. get => _mqttClient.ConnectedHandler;
  45. set => _mqttClient.ConnectedHandler = value;
  46. }
  47. public IMqttClientDisconnectedHandler DisconnectedHandler
  48. {
  49. get => _mqttClient.DisconnectedHandler;
  50. set => _mqttClient.DisconnectedHandler = value;
  51. }
  52. public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
  53. {
  54. get => _mqttClient.ApplicationMessageReceivedHandler;
  55. set => _mqttClient.ApplicationMessageReceivedHandler = value;
  56. }
  57. public IApplicationMessageProcessedHandler ApplicationMessageProcessedHandler { get; set; }
  58. public IApplicationMessageSkippedHandler ApplicationMessageSkippedHandler { get; set; }
  59. public IConnectingFailedHandler ConnectingFailedHandler { get; set; }
  60. public ISynchronizingSubscriptionsFailedHandler SynchronizingSubscriptionsFailedHandler { get; set; }
  61. public async Task StartAsync(IManagedMqttClientOptions options)
  62. {
  63. ThrowIfDisposed();
  64. if (options == null) throw new ArgumentNullException(nameof(options));
  65. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  66. if (!options.ClientOptions.CleanSession)
  67. {
  68. throw new NotSupportedException("The managed client does not support existing sessions.");
  69. }
  70. if (!_maintainConnectionTask?.IsCompleted ?? false) throw new InvalidOperationException("The managed client is already started.");
  71. Options = options;
  72. if (Options.Storage != null)
  73. {
  74. _storageManager = new ManagedMqttClientStorageManager(Options.Storage);
  75. var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  76. foreach (var message in messages)
  77. {
  78. _messageQueue.Enqueue(message);
  79. }
  80. }
  81. _connectionCancellationToken = new CancellationTokenSource();
  82. _maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token);
  83. _maintainConnectionTask.Forget(_logger);
  84. _logger.Info("Started");
  85. }
  86. public async Task StopAsync()
  87. {
  88. ThrowIfDisposed();
  89. StopPublishing();
  90. StopMaintainingConnection();
  91. _messageQueue.Clear();
  92. if (_maintainConnectionTask != null)
  93. {
  94. await Task.WhenAny(_maintainConnectionTask);
  95. _maintainConnectionTask = null;
  96. }
  97. }
  98. public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
  99. {
  100. ThrowIfDisposed();
  101. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  102. await PublishAsync(new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage).Build()).ConfigureAwait(false);
  103. return new MqttClientPublishResult();
  104. }
  105. public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
  106. {
  107. ThrowIfDisposed();
  108. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  109. MqttTopicValidator.ThrowIfInvalid(applicationMessage.ApplicationMessage.Topic);
  110. ManagedMqttApplicationMessage removedMessage = null;
  111. ApplicationMessageSkippedEventArgs applicationMessageSkippedEventArgs = null;
  112. try
  113. {
  114. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))
  115. {
  116. if (_messageQueue.Count >= Options.MaxPendingMessages)
  117. {
  118. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
  119. {
  120. _logger.Verbose("Skipping publish of new application message because internal queue is full.");
  121. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(applicationMessage);
  122. return;
  123. }
  124. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
  125. {
  126. removedMessage = _messageQueue.RemoveFirst();
  127. _logger.Verbose("Removed oldest application message from internal queue because it is full.");
  128. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(removedMessage);
  129. }
  130. }
  131. _messageQueue.Enqueue(applicationMessage);
  132. if (_storageManager != null)
  133. {
  134. if (removedMessage != null)
  135. {
  136. await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
  137. }
  138. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  139. }
  140. }
  141. }
  142. finally
  143. {
  144. if (applicationMessageSkippedEventArgs != null)
  145. {
  146. var applicationMessageSkippedHandler = ApplicationMessageSkippedHandler;
  147. if (applicationMessageSkippedHandler != null)
  148. {
  149. await applicationMessageSkippedHandler.HandleApplicationMessageSkippedAsync(applicationMessageSkippedEventArgs).ConfigureAwait(false);
  150. }
  151. }
  152. }
  153. }
  154. public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  155. {
  156. ThrowIfDisposed();
  157. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  158. lock (_subscriptions)
  159. {
  160. foreach (var topicFilter in topicFilters)
  161. {
  162. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  163. _subscriptionsNotPushed = true;
  164. }
  165. }
  166. return Task.FromResult(0);
  167. }
  168. public Task UnsubscribeAsync(IEnumerable<string> topics)
  169. {
  170. ThrowIfDisposed();
  171. if (topics == null) throw new ArgumentNullException(nameof(topics));
  172. lock (_subscriptions)
  173. {
  174. foreach (var topic in topics)
  175. {
  176. if (_subscriptions.Remove(topic))
  177. {
  178. _unsubscriptions.Add(topic);
  179. _subscriptionsNotPushed = true;
  180. }
  181. }
  182. }
  183. return Task.FromResult(0);
  184. }
  185. public void Dispose()
  186. {
  187. if (_disposed)
  188. {
  189. return;
  190. }
  191. _disposed = true;
  192. StopPublishing();
  193. StopMaintainingConnection();
  194. if (_maintainConnectionTask != null)
  195. {
  196. Task.WaitAny(_maintainConnectionTask);
  197. _maintainConnectionTask = null;
  198. }
  199. _messageQueueLock.Dispose();
  200. _mqttClient.Dispose();
  201. }
  202. private void ThrowIfDisposed()
  203. {
  204. if (_disposed)
  205. {
  206. throw new ObjectDisposedException(nameof(ManagedMqttClient));
  207. }
  208. }
  209. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  210. {
  211. try
  212. {
  213. while (!cancellationToken.IsCancellationRequested)
  214. {
  215. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  216. }
  217. }
  218. catch (OperationCanceledException)
  219. {
  220. }
  221. catch (Exception exception)
  222. {
  223. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  224. }
  225. finally
  226. {
  227. if (!_disposed)
  228. {
  229. try
  230. {
  231. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  232. }
  233. catch (Exception exception)
  234. {
  235. _logger.Error(exception, "Error while disconnecting.");
  236. }
  237. _logger.Info("Stopped");
  238. }
  239. }
  240. }
  241. private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  242. {
  243. try
  244. {
  245. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  246. if (connectionState == ReconnectionResult.NotConnected)
  247. {
  248. StopPublishing();
  249. await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  250. return;
  251. }
  252. if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
  253. {
  254. await SynchronizeSubscriptionsAsync().ConfigureAwait(false);
  255. StartPublishing();
  256. return;
  257. }
  258. if (connectionState == ReconnectionResult.StillConnected)
  259. {
  260. await Task.Delay(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
  261. }
  262. }
  263. catch (OperationCanceledException)
  264. {
  265. }
  266. catch (MqttCommunicationException exception)
  267. {
  268. _logger.Warning(exception, "Communication exception while maintaining connection.");
  269. }
  270. catch (Exception exception)
  271. {
  272. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  273. }
  274. }
  275. private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  276. {
  277. try
  278. {
  279. while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected)
  280. {
  281. // Peek at the message without dequeueing in order to prevent the
  282. // possibility of the queue growing beyond the configured cap.
  283. // Previously, messages could be re-enqueued if there was an
  284. // exception, and this re-enqueueing did not honor the cap.
  285. // Furthermore, because re-enqueueing would shuffle the order
  286. // of the messages, the DropOldestQueuedMessage strategy would
  287. // be unable to know which message is actually the oldest and would
  288. // instead drop the first item in the queue.
  289. var message = _messageQueue.PeekAndWait();
  290. if (message == null)
  291. {
  292. continue;
  293. }
  294. cancellationToken.ThrowIfCancellationRequested();
  295. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  296. }
  297. }
  298. catch (OperationCanceledException)
  299. {
  300. }
  301. catch (Exception exception)
  302. {
  303. _logger.Error(exception, "Error while publishing queued application messages.");
  304. }
  305. finally
  306. {
  307. _logger.Verbose("Stopped publishing messages.");
  308. }
  309. }
  310. private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)
  311. {
  312. Exception transmitException = null;
  313. try
  314. {
  315. await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);
  316. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  317. {
  318. // While publishing this message, this.PublishAsync could have booted this
  319. // message off the queue to make room for another (when using a cap
  320. // with the DropOldestQueuedMessage strategy). If the first item
  321. // in the queue is equal to this message, then it's safe to remove
  322. // it from the queue. If not, that means this.PublishAsync has already
  323. // removed it, in which case we don't want to do anything.
  324. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  325. if (_storageManager != null)
  326. {
  327. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  328. }
  329. }
  330. }
  331. catch (MqttCommunicationException exception)
  332. {
  333. transmitException = exception;
  334. _logger.Warning(exception, $"Publishing application ({message.Id}) message failed.");
  335. if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  336. {
  337. //If QoS 0, we don't want this message to stay on the queue.
  338. //If QoS 1 or 2, it's possible that, when using a cap, this message
  339. //has been booted off the queue by this.PublishAsync, in which case this
  340. //thread will not continue to try to publish it. While this does
  341. //contradict the expected behavior of QoS 1 and 2, that's also true
  342. //for the usage of a message queue cap, so it's still consistent
  343. //with prior behavior in that way.
  344. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  345. {
  346. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  347. if (_storageManager != null)
  348. {
  349. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  350. }
  351. }
  352. }
  353. }
  354. catch (Exception exception)
  355. {
  356. transmitException = exception;
  357. _logger.Error(exception, $"Error while publishing application message ({message.Id}).");
  358. }
  359. finally
  360. {
  361. var eventHandler = ApplicationMessageProcessedHandler;
  362. if (eventHandler != null)
  363. {
  364. var eventArguments = new ApplicationMessageProcessedEventArgs(message, transmitException);
  365. await eventHandler.HandleApplicationMessageProcessedAsync(eventArguments).ConfigureAwait(false);
  366. }
  367. }
  368. }
  369. private async Task SynchronizeSubscriptionsAsync()
  370. {
  371. _logger.Info("Synchronizing subscriptions");
  372. List<TopicFilter> subscriptions;
  373. HashSet<string> unsubscriptions;
  374. lock (_subscriptions)
  375. {
  376. subscriptions = _subscriptions.Select(i => new TopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value }).ToList();
  377. unsubscriptions = new HashSet<string>(_unsubscriptions);
  378. _unsubscriptions.Clear();
  379. _subscriptionsNotPushed = false;
  380. }
  381. if (!subscriptions.Any() && !unsubscriptions.Any())
  382. {
  383. return;
  384. }
  385. try
  386. {
  387. if (unsubscriptions.Any())
  388. {
  389. await _mqttClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
  390. }
  391. if (subscriptions.Any())
  392. {
  393. await _mqttClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
  394. }
  395. }
  396. catch (Exception exception)
  397. {
  398. _logger.Warning(exception, "Synchronizing subscriptions failed.");
  399. _subscriptionsNotPushed = true;
  400. var synchronizingSubscriptionsFailedHandler = SynchronizingSubscriptionsFailedHandler;
  401. if (SynchronizingSubscriptionsFailedHandler != null)
  402. {
  403. await synchronizingSubscriptionsFailedHandler.HandleSynchronizingSubscriptionsFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  404. }
  405. }
  406. }
  407. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  408. {
  409. if (_mqttClient.IsConnected)
  410. {
  411. return ReconnectionResult.StillConnected;
  412. }
  413. try
  414. {
  415. await _mqttClient.ConnectAsync(Options.ClientOptions).ConfigureAwait(false);
  416. return ReconnectionResult.Reconnected;
  417. }
  418. catch (Exception exception)
  419. {
  420. var connectingFailedHandler = ConnectingFailedHandler;
  421. if (connectingFailedHandler != null)
  422. {
  423. await connectingFailedHandler.HandleConnectingFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  424. }
  425. return ReconnectionResult.NotConnected;
  426. }
  427. }
  428. private void StartPublishing()
  429. {
  430. if (_publishingCancellationToken != null)
  431. {
  432. StopPublishing();
  433. }
  434. var cts = new CancellationTokenSource();
  435. _publishingCancellationToken = cts;
  436. Task.Run(() => PublishQueuedMessagesAsync(cts.Token), cts.Token).Forget(_logger);
  437. }
  438. private void StopPublishing()
  439. {
  440. _publishingCancellationToken?.Cancel(false);
  441. _publishingCancellationToken?.Dispose();
  442. _publishingCancellationToken = null;
  443. }
  444. private void StopMaintainingConnection()
  445. {
  446. _connectionCancellationToken?.Cancel(false);
  447. _connectionCancellationToken?.Dispose();
  448. _connectionCancellationToken = null;
  449. }
  450. }
  451. }