You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

682 lines
26 KiB

  1. using MQTTnet.Client;
  2. using MQTTnet.Client.Connecting;
  3. using MQTTnet.Client.Disconnecting;
  4. using MQTTnet.Client.Publishing;
  5. using MQTTnet.Client.Receiving;
  6. using MQTTnet.Diagnostics;
  7. using MQTTnet.Exceptions;
  8. using MQTTnet.Internal;
  9. using MQTTnet.Protocol;
  10. using MQTTnet.Server;
  11. using System;
  12. using System.Collections.Generic;
  13. using System.Linq;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. using MQTTnet.Diagnostics.Logger;
  17. namespace MQTTnet.Extensions.ManagedClient
  18. {
  19. public class ManagedMqttClient : Disposable, IManagedMqttClient
  20. {
  21. readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();
  22. /// <summary>
  23. /// The subscriptions are managed in 2 separate buckets:
  24. /// <see cref="_subscriptions"/> and <see cref="_unsubscriptions"/> are processed during normal operation
  25. /// and are moved to the <see cref="_reconnectSubscriptions"/> when they get processed. They can be accessed by
  26. /// any thread and are therefore mutex'ed. <see cref="_reconnectSubscriptions"/> get sent to the broker
  27. /// at reconnect and are solely owned by <see cref="MaintainConnectionAsync"/>.
  28. /// </summary>
  29. readonly Dictionary<string, MqttQualityOfServiceLevel> _reconnectSubscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  30. readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  31. readonly HashSet<string> _unsubscriptions = new HashSet<string>();
  32. readonly SemaphoreSlim _subscriptionsQueuedSignal = new SemaphoreSlim(0);
  33. readonly MqttNetSourceLogger _logger;
  34. readonly AsyncLock _messageQueueLock = new AsyncLock();
  35. CancellationTokenSource _connectionCancellationToken;
  36. CancellationTokenSource _publishingCancellationToken;
  37. Task _maintainConnectionTask;
  38. ManagedMqttClientStorageManager _storageManager;
  39. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
  40. {
  41. InternalClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  42. if (logger == null) throw new ArgumentNullException(nameof(logger));
  43. _logger = logger.WithSource(nameof(ManagedMqttClient));
  44. }
  45. public bool IsConnected => InternalClient.IsConnected;
  46. public bool IsStarted => _connectionCancellationToken != null;
  47. public IMqttClient InternalClient { get; }
  48. public int PendingApplicationMessagesCount => _messageQueue.Count;
  49. public IManagedMqttClientOptions Options { get; private set; }
  50. public IMqttClientConnectedHandler ConnectedHandler
  51. {
  52. get => InternalClient.ConnectedHandler;
  53. set => InternalClient.ConnectedHandler = value;
  54. }
  55. public IMqttClientDisconnectedHandler DisconnectedHandler
  56. {
  57. get => InternalClient.DisconnectedHandler;
  58. set => InternalClient.DisconnectedHandler = value;
  59. }
  60. public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
  61. {
  62. get => InternalClient.ApplicationMessageReceivedHandler;
  63. set => InternalClient.ApplicationMessageReceivedHandler = value;
  64. }
  65. public IApplicationMessageProcessedHandler ApplicationMessageProcessedHandler { get; set; }
  66. public IApplicationMessageSkippedHandler ApplicationMessageSkippedHandler { get; set; }
  67. public IConnectingFailedHandler ConnectingFailedHandler { get; set; }
  68. public ISynchronizingSubscriptionsFailedHandler SynchronizingSubscriptionsFailedHandler { get; set; }
  69. public async Task StartAsync(IManagedMqttClientOptions options)
  70. {
  71. ThrowIfDisposed();
  72. if (options == null) throw new ArgumentNullException(nameof(options));
  73. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  74. if (!_maintainConnectionTask?.IsCompleted ?? false) throw new InvalidOperationException("The managed client is already started.");
  75. Options = options;
  76. if (options.Storage != null)
  77. {
  78. _storageManager = new ManagedMqttClientStorageManager(options.Storage);
  79. var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  80. foreach (var message in messages)
  81. {
  82. _messageQueue.Enqueue(message);
  83. }
  84. }
  85. var cancellationTokenSource = new CancellationTokenSource();
  86. var cancellationToken = cancellationTokenSource.Token;
  87. _connectionCancellationToken = cancellationTokenSource;
  88. _maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(cancellationToken), cancellationToken);
  89. _maintainConnectionTask.RunInBackground(_logger);
  90. _logger.Info("Started");
  91. }
  92. public async Task StopAsync()
  93. {
  94. ThrowIfDisposed();
  95. StopPublishing();
  96. StopMaintainingConnection();
  97. _messageQueue.Clear();
  98. if (_maintainConnectionTask != null)
  99. {
  100. await Task.WhenAny(_maintainConnectionTask);
  101. _maintainConnectionTask = null;
  102. }
  103. }
  104. public Task PingAsync(CancellationToken cancellationToken)
  105. {
  106. return InternalClient.PingAsync(cancellationToken);
  107. }
  108. public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
  109. {
  110. ThrowIfDisposed();
  111. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  112. await PublishAsync(new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage).Build()).ConfigureAwait(false);
  113. return new MqttClientPublishResult();
  114. }
  115. public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
  116. {
  117. ThrowIfDisposed();
  118. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  119. if (Options == null) throw new InvalidOperationException("call StartAsync before publishing messages");
  120. MqttTopicValidator.ThrowIfInvalid(applicationMessage.ApplicationMessage.Topic);
  121. ManagedMqttApplicationMessage removedMessage = null;
  122. ApplicationMessageSkippedEventArgs applicationMessageSkippedEventArgs = null;
  123. try
  124. {
  125. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))
  126. {
  127. if (_messageQueue.Count >= Options.MaxPendingMessages)
  128. {
  129. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
  130. {
  131. _logger.Verbose("Skipping publish of new application message because internal queue is full.");
  132. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(applicationMessage);
  133. return;
  134. }
  135. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
  136. {
  137. removedMessage = _messageQueue.RemoveFirst();
  138. _logger.Verbose("Removed oldest application message from internal queue because it is full.");
  139. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(removedMessage);
  140. }
  141. }
  142. _messageQueue.Enqueue(applicationMessage);
  143. if (_storageManager != null)
  144. {
  145. if (removedMessage != null)
  146. {
  147. await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
  148. }
  149. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  150. }
  151. }
  152. }
  153. finally
  154. {
  155. if (applicationMessageSkippedEventArgs != null)
  156. {
  157. var applicationMessageSkippedHandler = ApplicationMessageSkippedHandler;
  158. if (applicationMessageSkippedHandler != null)
  159. {
  160. await applicationMessageSkippedHandler.HandleApplicationMessageSkippedAsync(applicationMessageSkippedEventArgs).ConfigureAwait(false);
  161. }
  162. }
  163. }
  164. }
  165. public Task SubscribeAsync(IEnumerable<MqttTopicFilter> topicFilters)
  166. {
  167. ThrowIfDisposed();
  168. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  169. foreach (var topicFilter in topicFilters)
  170. {
  171. MqttTopicValidator.ThrowIfInvalidSubscribe(topicFilter.Topic);
  172. }
  173. lock (_subscriptions)
  174. {
  175. foreach (var topicFilter in topicFilters)
  176. {
  177. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  178. _unsubscriptions.Remove(topicFilter.Topic);
  179. }
  180. }
  181. _subscriptionsQueuedSignal.Release();
  182. return Task.FromResult(0);
  183. }
  184. public Task UnsubscribeAsync(IEnumerable<string> topics)
  185. {
  186. ThrowIfDisposed();
  187. if (topics == null) throw new ArgumentNullException(nameof(topics));
  188. lock (_subscriptions)
  189. {
  190. foreach (var topic in topics)
  191. {
  192. _subscriptions.Remove(topic);
  193. _unsubscriptions.Add(topic);
  194. }
  195. }
  196. _subscriptionsQueuedSignal.Release();
  197. return Task.FromResult(0);
  198. }
  199. protected override void Dispose(bool disposing)
  200. {
  201. if (disposing)
  202. {
  203. StopPublishing();
  204. StopMaintainingConnection();
  205. if (_maintainConnectionTask != null)
  206. {
  207. _maintainConnectionTask.GetAwaiter().GetResult();
  208. _maintainConnectionTask = null;
  209. }
  210. _messageQueue.Dispose();
  211. _messageQueueLock.Dispose();
  212. InternalClient.Dispose();
  213. _subscriptionsQueuedSignal.Dispose();
  214. }
  215. base.Dispose(disposing);
  216. }
  217. async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  218. {
  219. try
  220. {
  221. while (!cancellationToken.IsCancellationRequested)
  222. {
  223. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  224. }
  225. }
  226. catch (OperationCanceledException)
  227. {
  228. }
  229. catch (Exception exception)
  230. {
  231. _logger.Error(exception, "Error exception while maintaining connection.");
  232. }
  233. finally
  234. {
  235. if (!IsDisposed)
  236. {
  237. try
  238. {
  239. using (var disconnectTimeout = new CancellationTokenSource(Options.ClientOptions.CommunicationTimeout))
  240. {
  241. await InternalClient.DisconnectAsync(disconnectTimeout.Token).ConfigureAwait(false);
  242. }
  243. }
  244. catch (OperationCanceledException)
  245. {
  246. _logger.Warning("Timeout while sending DISCONNECT packet.");
  247. }
  248. catch (Exception exception)
  249. {
  250. _logger.Error(exception, "Error while disconnecting.");
  251. }
  252. _logger.Info("Stopped");
  253. }
  254. _reconnectSubscriptions.Clear();
  255. lock (_subscriptions)
  256. {
  257. _subscriptions.Clear();
  258. _unsubscriptions.Clear();
  259. }
  260. }
  261. }
  262. async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  263. {
  264. try
  265. {
  266. var connectionState = await ReconnectIfRequiredAsync(cancellationToken).ConfigureAwait(false);
  267. if (connectionState == ReconnectionResult.NotConnected)
  268. {
  269. StopPublishing();
  270. await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  271. return;
  272. }
  273. if (connectionState == ReconnectionResult.Reconnected)
  274. {
  275. await PublishReconnectSubscriptionsAsync().ConfigureAwait(false);
  276. StartPublishing();
  277. return;
  278. }
  279. if (connectionState == ReconnectionResult.Recovered)
  280. {
  281. StartPublishing();
  282. return;
  283. }
  284. if (connectionState == ReconnectionResult.StillConnected)
  285. {
  286. await PublishSubscriptionsAsync(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
  287. }
  288. }
  289. catch (OperationCanceledException)
  290. {
  291. }
  292. catch (MqttCommunicationException exception)
  293. {
  294. _logger.Warning(exception, "Communication error while maintaining connection.");
  295. }
  296. catch (Exception exception)
  297. {
  298. _logger.Error(exception, "Error exception while maintaining connection.");
  299. }
  300. }
  301. async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  302. {
  303. try
  304. {
  305. while (!cancellationToken.IsCancellationRequested && InternalClient.IsConnected)
  306. {
  307. // Peek at the message without dequeueing in order to prevent the
  308. // possibility of the queue growing beyond the configured cap.
  309. // Previously, messages could be re-enqueued if there was an
  310. // exception, and this re-enqueueing did not honor the cap.
  311. // Furthermore, because re-enqueueing would shuffle the order
  312. // of the messages, the DropOldestQueuedMessage strategy would
  313. // be unable to know which message is actually the oldest and would
  314. // instead drop the first item in the queue.
  315. var message = _messageQueue.PeekAndWait(cancellationToken);
  316. if (message == null)
  317. {
  318. continue;
  319. }
  320. cancellationToken.ThrowIfCancellationRequested();
  321. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  322. }
  323. }
  324. catch (OperationCanceledException)
  325. {
  326. }
  327. catch (Exception exception)
  328. {
  329. _logger.Error(exception, "Error while publishing queued application messages.");
  330. }
  331. finally
  332. {
  333. _logger.Verbose("Stopped publishing messages.");
  334. }
  335. }
  336. async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)
  337. {
  338. Exception transmitException = null;
  339. try
  340. {
  341. await InternalClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);
  342. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  343. {
  344. // While publishing this message, this.PublishAsync could have booted this
  345. // message off the queue to make room for another (when using a cap
  346. // with the DropOldestQueuedMessage strategy). If the first item
  347. // in the queue is equal to this message, then it's safe to remove
  348. // it from the queue. If not, that means this.PublishAsync has already
  349. // removed it, in which case we don't want to do anything.
  350. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  351. if (_storageManager != null)
  352. {
  353. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  354. }
  355. }
  356. }
  357. catch (MqttCommunicationException exception)
  358. {
  359. transmitException = exception;
  360. _logger.Warning(exception, "Publishing application message ({0}) failed.", message.Id);
  361. if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  362. {
  363. //If QoS 0, we don't want this message to stay on the queue.
  364. //If QoS 1 or 2, it's possible that, when using a cap, this message
  365. //has been booted off the queue by this.PublishAsync, in which case this
  366. //thread will not continue to try to publish it. While this does
  367. //contradict the expected behavior of QoS 1 and 2, that's also true
  368. //for the usage of a message queue cap, so it's still consistent
  369. //with prior behavior in that way.
  370. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  371. {
  372. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  373. if (_storageManager != null)
  374. {
  375. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  376. }
  377. }
  378. }
  379. }
  380. catch (Exception exception)
  381. {
  382. transmitException = exception;
  383. _logger.Error(exception, "Error while publishing application message ({0}).", message.Id);
  384. }
  385. finally
  386. {
  387. var eventHandler = ApplicationMessageProcessedHandler;
  388. if (eventHandler != null)
  389. {
  390. var eventArguments = new ApplicationMessageProcessedEventArgs(message, transmitException);
  391. await eventHandler.HandleApplicationMessageProcessedAsync(eventArguments).ConfigureAwait(false);
  392. }
  393. }
  394. }
  395. async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancellationToken)
  396. {
  397. var endTime = DateTime.UtcNow + timeout;
  398. while (await _subscriptionsQueuedSignal.WaitAsync(GetRemainingTime(endTime), cancellationToken).ConfigureAwait(false))
  399. {
  400. List<MqttTopicFilter> subscriptions;
  401. HashSet<string> unsubscriptions;
  402. lock (_subscriptions)
  403. {
  404. subscriptions = _subscriptions.Select(i => new MqttTopicFilter
  405. {
  406. Topic = i.Key,
  407. QualityOfServiceLevel = i.Value
  408. }).ToList();
  409. _subscriptions.Clear();
  410. unsubscriptions = new HashSet<string>(_unsubscriptions);
  411. _unsubscriptions.Clear();
  412. }
  413. if (!subscriptions.Any() && !unsubscriptions.Any())
  414. {
  415. continue;
  416. }
  417. _logger.Verbose("Publishing {0} added and {1} removed subscriptions", subscriptions.Count, unsubscriptions.Count);
  418. foreach (var unsubscription in unsubscriptions)
  419. {
  420. _reconnectSubscriptions.Remove(unsubscription);
  421. }
  422. foreach (var subscription in subscriptions)
  423. {
  424. _reconnectSubscriptions[subscription.Topic] = subscription.QualityOfServiceLevel;
  425. }
  426. var addedTopicFilters = new List<MqttTopicFilter>();
  427. foreach (var subscription in subscriptions)
  428. {
  429. addedTopicFilters.Add(subscription);
  430. if (addedTopicFilters.Count == Options.MaxTopicFiltersInSubscribeUnsubscribePackets)
  431. {
  432. await SendSubscribeUnsubscribe(addedTopicFilters, null).ConfigureAwait(false);
  433. addedTopicFilters.Clear();
  434. }
  435. }
  436. await SendSubscribeUnsubscribe(addedTopicFilters, null).ConfigureAwait(false);
  437. var removedTopicFilters = new List<string>();
  438. foreach (var unSub in unsubscriptions)
  439. {
  440. removedTopicFilters.Add(unSub);
  441. if (removedTopicFilters.Count == Options.MaxTopicFiltersInSubscribeUnsubscribePackets)
  442. {
  443. await SendSubscribeUnsubscribe(null, removedTopicFilters).ConfigureAwait(false);
  444. removedTopicFilters.Clear();
  445. }
  446. }
  447. await SendSubscribeUnsubscribe(null, removedTopicFilters).ConfigureAwait(false);
  448. }
  449. }
  450. async Task SendSubscribeUnsubscribe(List<MqttTopicFilter> addedSubscriptions, List<string> removedSubscriptions)
  451. {
  452. try
  453. {
  454. if (removedSubscriptions != null && removedSubscriptions.Any())
  455. {
  456. await InternalClient.UnsubscribeAsync(removedSubscriptions.ToArray()).ConfigureAwait(false);
  457. }
  458. if (addedSubscriptions != null && addedSubscriptions.Any())
  459. {
  460. await InternalClient.SubscribeAsync(addedSubscriptions.ToArray()).ConfigureAwait(false);
  461. }
  462. }
  463. catch (Exception exception)
  464. {
  465. await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
  466. }
  467. }
  468. async Task PublishReconnectSubscriptionsAsync()
  469. {
  470. _logger.Info("Publishing subscriptions at reconnect");
  471. try
  472. {
  473. if (_reconnectSubscriptions.Any())
  474. {
  475. var subscriptions = _reconnectSubscriptions.Select(i => new MqttTopicFilter
  476. {
  477. Topic = i.Key,
  478. QualityOfServiceLevel = i.Value
  479. });
  480. var topicFilters = new List<MqttTopicFilter>();
  481. foreach (var sub in subscriptions)
  482. {
  483. topicFilters.Add(sub);
  484. if (topicFilters.Count == Options.MaxTopicFiltersInSubscribeUnsubscribePackets)
  485. {
  486. await SendSubscribeUnsubscribe(topicFilters, null).ConfigureAwait(false);
  487. topicFilters.Clear();
  488. }
  489. }
  490. await SendSubscribeUnsubscribe(topicFilters, null).ConfigureAwait(false);
  491. }
  492. }
  493. catch (Exception exception)
  494. {
  495. await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
  496. }
  497. }
  498. async Task HandleSubscriptionExceptionAsync(Exception exception)
  499. {
  500. _logger.Warning(exception, "Synchronizing subscriptions failed.");
  501. var synchronizingSubscriptionsFailedHandler = SynchronizingSubscriptionsFailedHandler;
  502. if (SynchronizingSubscriptionsFailedHandler != null)
  503. {
  504. await synchronizingSubscriptionsFailedHandler.HandleSynchronizingSubscriptionsFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  505. }
  506. }
  507. async Task<ReconnectionResult> ReconnectIfRequiredAsync(CancellationToken cancellationToken)
  508. {
  509. if (InternalClient.IsConnected)
  510. {
  511. return ReconnectionResult.StillConnected;
  512. }
  513. try
  514. {
  515. var result = await InternalClient.ConnectAsync(Options.ClientOptions, cancellationToken).ConfigureAwait(false);
  516. return result.IsSessionPresent ? ReconnectionResult.Recovered : ReconnectionResult.Reconnected;
  517. }
  518. catch (Exception exception)
  519. {
  520. var connectingFailedHandler = ConnectingFailedHandler;
  521. if (connectingFailedHandler != null)
  522. {
  523. await connectingFailedHandler.HandleConnectingFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  524. }
  525. return ReconnectionResult.NotConnected;
  526. }
  527. }
  528. void StartPublishing()
  529. {
  530. if (_publishingCancellationToken != null)
  531. {
  532. StopPublishing();
  533. }
  534. var cancellationTokenSource = new CancellationTokenSource();
  535. var cancellationToken = cancellationTokenSource.Token;
  536. _publishingCancellationToken = cancellationTokenSource;
  537. Task.Run(() => PublishQueuedMessagesAsync(cancellationToken), cancellationToken).RunInBackground(_logger);
  538. }
  539. void StopPublishing()
  540. {
  541. try
  542. {
  543. _publishingCancellationToken?.Cancel(false);
  544. }
  545. finally
  546. {
  547. _publishingCancellationToken?.Dispose();
  548. _publishingCancellationToken = null;
  549. }
  550. }
  551. void StopMaintainingConnection()
  552. {
  553. try
  554. {
  555. _connectionCancellationToken?.Cancel(false);
  556. }
  557. finally
  558. {
  559. _connectionCancellationToken?.Dispose();
  560. _connectionCancellationToken = null;
  561. }
  562. }
  563. static TimeSpan GetRemainingTime(DateTime endTime)
  564. {
  565. var remainingTime = endTime - DateTime.UtcNow;
  566. return remainingTime < TimeSpan.Zero ? TimeSpan.Zero : remainingTime;
  567. }
  568. }
  569. }