You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

615 lines
24 KiB

  1. using MQTTnet.Client;
  2. using MQTTnet.Client.Connecting;
  3. using MQTTnet.Client.Disconnecting;
  4. using MQTTnet.Client.Publishing;
  5. using MQTTnet.Client.Receiving;
  6. using MQTTnet.Diagnostics;
  7. using MQTTnet.Exceptions;
  8. using MQTTnet.Internal;
  9. using MQTTnet.Protocol;
  10. using MQTTnet.Server;
  11. using System;
  12. using System.Collections.Generic;
  13. using System.Linq;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. namespace MQTTnet.Extensions.ManagedClient
  17. {
  18. public class ManagedMqttClient : Disposable, IManagedMqttClient
  19. {
  20. readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();
  21. /// <summary>
  22. /// The subscriptions are managed in 2 separate buckets:
  23. /// <see cref="_subscriptions"/> and <see cref="_unsubscriptions"/> are processed during normal operation
  24. /// and are moved to the <see cref="_reconnectSubscriptions"/> when they get processed. They can be accessed by
  25. /// any thread and are therefore mutex'ed. <see cref="_reconnectSubscriptions"/> get sent to the broker
  26. /// at reconnect and are solely owned by <see cref="MaintainConnectionAsync"/>.
  27. /// </summary>
  28. readonly Dictionary<string, MqttQualityOfServiceLevel> _reconnectSubscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  29. readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  30. readonly HashSet<string> _unsubscriptions = new HashSet<string>();
  31. readonly SemaphoreSlim _subscriptionsQueuedSignal = new SemaphoreSlim(0);
  32. readonly IMqttClient _mqttClient;
  33. readonly IMqttNetScopedLogger _logger;
  34. readonly AsyncLock _messageQueueLock = new AsyncLock();
  35. CancellationTokenSource _connectionCancellationToken;
  36. CancellationTokenSource _publishingCancellationToken;
  37. Task _maintainConnectionTask;
  38. ManagedMqttClientStorageManager _storageManager;
  39. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
  40. {
  41. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  42. InternalClient = mqttClient;
  43. if (logger == null) throw new ArgumentNullException(nameof(logger));
  44. _logger = logger.CreateScopedLogger(nameof(ManagedMqttClient));
  45. }
  46. public bool IsConnected => _mqttClient.IsConnected;
  47. public bool IsStarted => _connectionCancellationToken != null;
  48. public IMqttClient InternalClient { get; }
  49. public int PendingApplicationMessagesCount => _messageQueue.Count;
  50. public IManagedMqttClientOptions Options { get; private set; }
  51. public IMqttClientConnectedHandler ConnectedHandler
  52. {
  53. get => _mqttClient.ConnectedHandler;
  54. set => _mqttClient.ConnectedHandler = value;
  55. }
  56. public IMqttClientDisconnectedHandler DisconnectedHandler
  57. {
  58. get => _mqttClient.DisconnectedHandler;
  59. set => _mqttClient.DisconnectedHandler = value;
  60. }
  61. public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
  62. {
  63. get => _mqttClient.ApplicationMessageReceivedHandler;
  64. set => _mqttClient.ApplicationMessageReceivedHandler = value;
  65. }
  66. public IApplicationMessageProcessedHandler ApplicationMessageProcessedHandler { get; set; }
  67. public IApplicationMessageSkippedHandler ApplicationMessageSkippedHandler { get; set; }
  68. public IConnectingFailedHandler ConnectingFailedHandler { get; set; }
  69. public ISynchronizingSubscriptionsFailedHandler SynchronizingSubscriptionsFailedHandler { get; set; }
  70. public async Task StartAsync(IManagedMqttClientOptions options)
  71. {
  72. ThrowIfDisposed();
  73. if (options == null) throw new ArgumentNullException(nameof(options));
  74. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  75. if (!_maintainConnectionTask?.IsCompleted ?? false) throw new InvalidOperationException("The managed client is already started.");
  76. Options = options;
  77. if (Options.Storage != null)
  78. {
  79. _storageManager = new ManagedMqttClientStorageManager(Options.Storage);
  80. var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  81. foreach (var message in messages)
  82. {
  83. _messageQueue.Enqueue(message);
  84. }
  85. }
  86. if (Options.AutoReconnect)
  87. {
  88. var cancellationTokenSource = new CancellationTokenSource();
  89. var cancellationToken = cancellationTokenSource.Token;
  90. _connectionCancellationToken = cancellationTokenSource;
  91. _maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(cancellationToken), cancellationToken);
  92. _maintainConnectionTask.Forget(_logger);
  93. }
  94. _logger.Info("Started");
  95. }
  96. public async Task StopAsync()
  97. {
  98. ThrowIfDisposed();
  99. StopPublishing();
  100. StopMaintainingConnection();
  101. _messageQueue.Clear();
  102. if (_maintainConnectionTask != null)
  103. {
  104. await Task.WhenAny(_maintainConnectionTask);
  105. _maintainConnectionTask = null;
  106. }
  107. }
  108. public Task PingAsync(CancellationToken cancellationToken)
  109. {
  110. return _mqttClient.PingAsync(cancellationToken);
  111. }
  112. public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
  113. {
  114. ThrowIfDisposed();
  115. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  116. await PublishAsync(new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage).Build()).ConfigureAwait(false);
  117. return new MqttClientPublishResult();
  118. }
  119. public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
  120. {
  121. ThrowIfDisposed();
  122. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  123. if (Options == null) throw new InvalidOperationException("call StartAsync before publishing messages");
  124. MqttTopicValidator.ThrowIfInvalid(applicationMessage.ApplicationMessage.Topic);
  125. ManagedMqttApplicationMessage removedMessage = null;
  126. ApplicationMessageSkippedEventArgs applicationMessageSkippedEventArgs = null;
  127. try
  128. {
  129. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))
  130. {
  131. if (_messageQueue.Count >= Options.MaxPendingMessages)
  132. {
  133. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
  134. {
  135. _logger.Verbose("Skipping publish of new application message because internal queue is full.");
  136. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(applicationMessage);
  137. return;
  138. }
  139. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
  140. {
  141. removedMessage = _messageQueue.RemoveFirst();
  142. _logger.Verbose("Removed oldest application message from internal queue because it is full.");
  143. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(removedMessage);
  144. }
  145. }
  146. _messageQueue.Enqueue(applicationMessage);
  147. if (_storageManager != null)
  148. {
  149. if (removedMessage != null)
  150. {
  151. await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
  152. }
  153. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  154. }
  155. }
  156. }
  157. finally
  158. {
  159. if (applicationMessageSkippedEventArgs != null)
  160. {
  161. var applicationMessageSkippedHandler = ApplicationMessageSkippedHandler;
  162. if (applicationMessageSkippedHandler != null)
  163. {
  164. await applicationMessageSkippedHandler.HandleApplicationMessageSkippedAsync(applicationMessageSkippedEventArgs).ConfigureAwait(false);
  165. }
  166. }
  167. }
  168. }
  169. public Task SubscribeAsync(IEnumerable<MqttTopicFilter> topicFilters)
  170. {
  171. ThrowIfDisposed();
  172. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  173. lock (_subscriptions)
  174. {
  175. foreach (var topicFilter in topicFilters)
  176. {
  177. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  178. _unsubscriptions.Remove(topicFilter.Topic);
  179. }
  180. }
  181. _subscriptionsQueuedSignal.Release();
  182. return Task.FromResult(0);
  183. }
  184. public Task UnsubscribeAsync(IEnumerable<string> topics)
  185. {
  186. ThrowIfDisposed();
  187. if (topics == null) throw new ArgumentNullException(nameof(topics));
  188. lock (_subscriptions)
  189. {
  190. foreach (var topic in topics)
  191. {
  192. _subscriptions.Remove(topic);
  193. _unsubscriptions.Add(topic);
  194. }
  195. }
  196. _subscriptionsQueuedSignal.Release();
  197. return Task.FromResult(0);
  198. }
  199. protected override void Dispose(bool disposing)
  200. {
  201. if (disposing)
  202. {
  203. StopPublishing();
  204. StopMaintainingConnection();
  205. if (_maintainConnectionTask != null)
  206. {
  207. _maintainConnectionTask.GetAwaiter().GetResult();
  208. _maintainConnectionTask = null;
  209. }
  210. _messageQueue.Dispose();
  211. _messageQueueLock.Dispose();
  212. _mqttClient.Dispose();
  213. _subscriptionsQueuedSignal.Dispose();
  214. }
  215. base.Dispose(disposing);
  216. }
  217. async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  218. {
  219. try
  220. {
  221. while (!cancellationToken.IsCancellationRequested)
  222. {
  223. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  224. }
  225. }
  226. catch (OperationCanceledException)
  227. {
  228. }
  229. catch (Exception exception)
  230. {
  231. _logger.Error(exception, "Error exception while maintaining connection.");
  232. }
  233. finally
  234. {
  235. if (!IsDisposed)
  236. {
  237. try
  238. {
  239. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  240. }
  241. catch (Exception exception)
  242. {
  243. _logger.Error(exception, "Error while disconnecting.");
  244. }
  245. _logger.Info("Stopped");
  246. }
  247. _reconnectSubscriptions.Clear();
  248. lock (_subscriptions)
  249. {
  250. _subscriptions.Clear();
  251. _unsubscriptions.Clear();
  252. }
  253. }
  254. }
  255. async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  256. {
  257. try
  258. {
  259. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  260. if (connectionState == ReconnectionResult.NotConnected)
  261. {
  262. StopPublishing();
  263. await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  264. return;
  265. }
  266. if (connectionState == ReconnectionResult.Reconnected)
  267. {
  268. await PublishReconnectSubscriptionsAsync().ConfigureAwait(false);
  269. StartPublishing();
  270. return;
  271. }
  272. if (connectionState == ReconnectionResult.Recovered)
  273. {
  274. StartPublishing();
  275. return;
  276. }
  277. if (connectionState == ReconnectionResult.StillConnected)
  278. {
  279. await PublishSubscriptionsAsync(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
  280. }
  281. }
  282. catch (OperationCanceledException)
  283. {
  284. }
  285. catch (MqttCommunicationException exception)
  286. {
  287. _logger.Warning(exception, "Communication error while maintaining connection.");
  288. }
  289. catch (Exception exception)
  290. {
  291. _logger.Error(exception, "Error exception while maintaining connection.");
  292. }
  293. }
  294. async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  295. {
  296. try
  297. {
  298. while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected)
  299. {
  300. // Peek at the message without dequeueing in order to prevent the
  301. // possibility of the queue growing beyond the configured cap.
  302. // Previously, messages could be re-enqueued if there was an
  303. // exception, and this re-enqueueing did not honor the cap.
  304. // Furthermore, because re-enqueueing would shuffle the order
  305. // of the messages, the DropOldestQueuedMessage strategy would
  306. // be unable to know which message is actually the oldest and would
  307. // instead drop the first item in the queue.
  308. var message = _messageQueue.PeekAndWait(cancellationToken);
  309. if (message == null)
  310. {
  311. continue;
  312. }
  313. cancellationToken.ThrowIfCancellationRequested();
  314. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  315. }
  316. }
  317. catch (OperationCanceledException)
  318. {
  319. }
  320. catch (Exception exception)
  321. {
  322. _logger.Error(exception, "Error while publishing queued application messages.");
  323. }
  324. finally
  325. {
  326. _logger.Verbose("Stopped publishing messages.");
  327. }
  328. }
  329. async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)
  330. {
  331. Exception transmitException = null;
  332. try
  333. {
  334. await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);
  335. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  336. {
  337. // While publishing this message, this.PublishAsync could have booted this
  338. // message off the queue to make room for another (when using a cap
  339. // with the DropOldestQueuedMessage strategy). If the first item
  340. // in the queue is equal to this message, then it's safe to remove
  341. // it from the queue. If not, that means this.PublishAsync has already
  342. // removed it, in which case we don't want to do anything.
  343. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  344. if (_storageManager != null)
  345. {
  346. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  347. }
  348. }
  349. }
  350. catch (MqttCommunicationException exception)
  351. {
  352. transmitException = exception;
  353. _logger.Warning(exception, $"Publishing application ({message.Id}) message failed.");
  354. if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  355. {
  356. //If QoS 0, we don't want this message to stay on the queue.
  357. //If QoS 1 or 2, it's possible that, when using a cap, this message
  358. //has been booted off the queue by this.PublishAsync, in which case this
  359. //thread will not continue to try to publish it. While this does
  360. //contradict the expected behavior of QoS 1 and 2, that's also true
  361. //for the usage of a message queue cap, so it's still consistent
  362. //with prior behavior in that way.
  363. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  364. {
  365. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  366. if (_storageManager != null)
  367. {
  368. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  369. }
  370. }
  371. }
  372. }
  373. catch (Exception exception)
  374. {
  375. transmitException = exception;
  376. _logger.Error(exception, $"Error while publishing application message ({message.Id}).");
  377. }
  378. finally
  379. {
  380. var eventHandler = ApplicationMessageProcessedHandler;
  381. if (eventHandler != null)
  382. {
  383. var eventArguments = new ApplicationMessageProcessedEventArgs(message, transmitException);
  384. await eventHandler.HandleApplicationMessageProcessedAsync(eventArguments).ConfigureAwait(false);
  385. }
  386. }
  387. }
  388. async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancellationToken)
  389. {
  390. var endTime = DateTime.UtcNow + timeout;
  391. while (await _subscriptionsQueuedSignal.WaitAsync(GetRemainingTime(endTime), cancellationToken).ConfigureAwait(false))
  392. {
  393. List<MqttTopicFilter> subscriptions;
  394. HashSet<string> unsubscriptions;
  395. lock (_subscriptions)
  396. {
  397. subscriptions = _subscriptions.Select(i => new MqttTopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value }).ToList();
  398. _subscriptions.Clear();
  399. unsubscriptions = new HashSet<string>(_unsubscriptions);
  400. _unsubscriptions.Clear();
  401. }
  402. if (!subscriptions.Any() && !unsubscriptions.Any())
  403. {
  404. continue;
  405. }
  406. _logger.Verbose($"Publishing subscriptions ({subscriptions.Count} subscriptions and {unsubscriptions.Count} unsubscriptions)");
  407. foreach (var unsubscription in unsubscriptions)
  408. {
  409. _reconnectSubscriptions.Remove(unsubscription);
  410. }
  411. foreach (var subscription in subscriptions)
  412. {
  413. _reconnectSubscriptions[subscription.Topic] = subscription.QualityOfServiceLevel;
  414. }
  415. try
  416. {
  417. if (unsubscriptions.Any())
  418. {
  419. await _mqttClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
  420. }
  421. if (subscriptions.Any())
  422. {
  423. await _mqttClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
  424. }
  425. }
  426. catch (Exception exception)
  427. {
  428. await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
  429. }
  430. }
  431. }
  432. async Task PublishReconnectSubscriptionsAsync()
  433. {
  434. _logger.Info("Publishing subscriptions at reconnect");
  435. try
  436. {
  437. if (_reconnectSubscriptions.Any())
  438. {
  439. var subscriptions = _reconnectSubscriptions.Select(i => new MqttTopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value });
  440. await _mqttClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
  441. }
  442. }
  443. catch (Exception exception)
  444. {
  445. await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
  446. }
  447. }
  448. async Task HandleSubscriptionExceptionAsync(Exception exception)
  449. {
  450. _logger.Warning(exception, "Synchronizing subscriptions failed.");
  451. var synchronizingSubscriptionsFailedHandler = SynchronizingSubscriptionsFailedHandler;
  452. if (SynchronizingSubscriptionsFailedHandler != null)
  453. {
  454. await synchronizingSubscriptionsFailedHandler.HandleSynchronizingSubscriptionsFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  455. }
  456. }
  457. async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  458. {
  459. if (_mqttClient.IsConnected)
  460. {
  461. return ReconnectionResult.StillConnected;
  462. }
  463. try
  464. {
  465. var result = await _mqttClient.ConnectAsync(Options.ClientOptions).ConfigureAwait(false);
  466. return result.IsSessionPresent ? ReconnectionResult.Recovered : ReconnectionResult.Reconnected;
  467. }
  468. catch (Exception exception)
  469. {
  470. var connectingFailedHandler = ConnectingFailedHandler;
  471. if (connectingFailedHandler != null)
  472. {
  473. await connectingFailedHandler.HandleConnectingFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  474. }
  475. return ReconnectionResult.NotConnected;
  476. }
  477. }
  478. void StartPublishing()
  479. {
  480. if (_publishingCancellationToken != null)
  481. {
  482. StopPublishing();
  483. }
  484. var cancellationTokenSource = new CancellationTokenSource();
  485. var cancellationToken = cancellationTokenSource.Token;
  486. _publishingCancellationToken = cancellationTokenSource;
  487. Task.Run(() => PublishQueuedMessagesAsync(cancellationToken), cancellationToken).Forget(_logger);
  488. }
  489. void StopPublishing()
  490. {
  491. try
  492. {
  493. _publishingCancellationToken?.Cancel(false);
  494. }
  495. finally
  496. {
  497. _publishingCancellationToken?.Dispose();
  498. _publishingCancellationToken = null;
  499. }
  500. }
  501. void StopMaintainingConnection()
  502. {
  503. try
  504. {
  505. _connectionCancellationToken?.Cancel(false);
  506. }
  507. finally
  508. {
  509. _connectionCancellationToken?.Dispose();
  510. _connectionCancellationToken = null;
  511. }
  512. }
  513. static TimeSpan GetRemainingTime(DateTime endTime)
  514. {
  515. var remainingTime = endTime - DateTime.UtcNow;
  516. return remainingTime < TimeSpan.Zero ? TimeSpan.Zero : remainingTime;
  517. }
  518. }
  519. }