Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 
 

626 rindas
24 KiB

  1. using MQTTnet.Client;
  2. using MQTTnet.Client.Connecting;
  3. using MQTTnet.Client.Disconnecting;
  4. using MQTTnet.Client.Publishing;
  5. using MQTTnet.Client.Receiving;
  6. using MQTTnet.Diagnostics;
  7. using MQTTnet.Exceptions;
  8. using MQTTnet.Internal;
  9. using MQTTnet.Protocol;
  10. using MQTTnet.Server;
  11. using System;
  12. using System.Collections.Generic;
  13. using System.Linq;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. namespace MQTTnet.Extensions.ManagedClient
  17. {
  18. public class ManagedMqttClient : Disposable, IManagedMqttClient
  19. {
  20. readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();
  21. /// <summary>
  22. /// The subscriptions are managed in 2 separate buckets:
  23. /// <see cref="_subscriptions"/> and <see cref="_unsubscriptions"/> are processed during normal operation
  24. /// and are moved to the <see cref="_reconnectSubscriptions"/> when they get processed. They can be accessed by
  25. /// any thread and are therefore mutex'ed. <see cref="_reconnectSubscriptions"/> get sent to the broker
  26. /// at reconnect and are solely owned by <see cref="MaintainConnectionAsync"/>.
  27. /// </summary>
  28. readonly Dictionary<string, MqttQualityOfServiceLevel> _reconnectSubscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  29. readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  30. readonly HashSet<string> _unsubscriptions = new HashSet<string>();
  31. readonly SemaphoreSlim _subscriptionsQueuedSignal = new SemaphoreSlim(0);
  32. readonly IMqttNetScopedLogger _logger;
  33. readonly AsyncLock _messageQueueLock = new AsyncLock();
  34. CancellationTokenSource _connectionCancellationToken;
  35. CancellationTokenSource _publishingCancellationToken;
  36. Task _maintainConnectionTask;
  37. ManagedMqttClientStorageManager _storageManager;
  38. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
  39. {
  40. InternalClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  41. if (logger == null) throw new ArgumentNullException(nameof(logger));
  42. _logger = logger.CreateScopedLogger(nameof(ManagedMqttClient));
  43. }
  44. public bool IsConnected => InternalClient.IsConnected;
  45. public bool IsStarted => _connectionCancellationToken != null;
  46. public IMqttClient InternalClient { get; }
  47. public int PendingApplicationMessagesCount => _messageQueue.Count;
  48. public IManagedMqttClientOptions Options { get; private set; }
  49. public IMqttClientConnectedHandler ConnectedHandler
  50. {
  51. get => InternalClient.ConnectedHandler;
  52. set => InternalClient.ConnectedHandler = value;
  53. }
  54. public IMqttClientDisconnectedHandler DisconnectedHandler
  55. {
  56. get => InternalClient.DisconnectedHandler;
  57. set => InternalClient.DisconnectedHandler = value;
  58. }
  59. public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
  60. {
  61. get => InternalClient.ApplicationMessageReceivedHandler;
  62. set => InternalClient.ApplicationMessageReceivedHandler = value;
  63. }
  64. public IApplicationMessageProcessedHandler ApplicationMessageProcessedHandler { get; set; }
  65. public IApplicationMessageSkippedHandler ApplicationMessageSkippedHandler { get; set; }
  66. public IConnectingFailedHandler ConnectingFailedHandler { get; set; }
  67. public ISynchronizingSubscriptionsFailedHandler SynchronizingSubscriptionsFailedHandler { get; set; }
  68. public async Task StartAsync(IManagedMqttClientOptions options)
  69. {
  70. ThrowIfDisposed();
  71. if (options == null) throw new ArgumentNullException(nameof(options));
  72. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  73. if (!_maintainConnectionTask?.IsCompleted ?? false) throw new InvalidOperationException("The managed client is already started.");
  74. Options = options;
  75. if (options.Storage != null)
  76. {
  77. _storageManager = new ManagedMqttClientStorageManager(options.Storage);
  78. var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  79. foreach (var message in messages)
  80. {
  81. _messageQueue.Enqueue(message);
  82. }
  83. }
  84. var cancellationTokenSource = new CancellationTokenSource();
  85. var cancellationToken = cancellationTokenSource.Token;
  86. _connectionCancellationToken = cancellationTokenSource;
  87. _maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(cancellationToken), cancellationToken);
  88. _maintainConnectionTask.RunInBackground(_logger);
  89. _logger.Info("Started");
  90. }
  91. public async Task StopAsync()
  92. {
  93. ThrowIfDisposed();
  94. StopPublishing();
  95. StopMaintainingConnection();
  96. _messageQueue.Clear();
  97. if (_maintainConnectionTask != null)
  98. {
  99. await Task.WhenAny(_maintainConnectionTask);
  100. _maintainConnectionTask = null;
  101. }
  102. }
  103. public Task PingAsync(CancellationToken cancellationToken)
  104. {
  105. return InternalClient.PingAsync(cancellationToken);
  106. }
  107. public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
  108. {
  109. ThrowIfDisposed();
  110. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  111. await PublishAsync(new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage).Build()).ConfigureAwait(false);
  112. return new MqttClientPublishResult();
  113. }
  114. public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
  115. {
  116. ThrowIfDisposed();
  117. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  118. if (Options == null) throw new InvalidOperationException("call StartAsync before publishing messages");
  119. MqttTopicValidator.ThrowIfInvalid(applicationMessage.ApplicationMessage.Topic);
  120. ManagedMqttApplicationMessage removedMessage = null;
  121. ApplicationMessageSkippedEventArgs applicationMessageSkippedEventArgs = null;
  122. try
  123. {
  124. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))
  125. {
  126. if (_messageQueue.Count >= Options.MaxPendingMessages)
  127. {
  128. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
  129. {
  130. _logger.Verbose("Skipping publish of new application message because internal queue is full.");
  131. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(applicationMessage);
  132. return;
  133. }
  134. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
  135. {
  136. removedMessage = _messageQueue.RemoveFirst();
  137. _logger.Verbose("Removed oldest application message from internal queue because it is full.");
  138. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(removedMessage);
  139. }
  140. }
  141. _messageQueue.Enqueue(applicationMessage);
  142. if (_storageManager != null)
  143. {
  144. if (removedMessage != null)
  145. {
  146. await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
  147. }
  148. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  149. }
  150. }
  151. }
  152. finally
  153. {
  154. if (applicationMessageSkippedEventArgs != null)
  155. {
  156. var applicationMessageSkippedHandler = ApplicationMessageSkippedHandler;
  157. if (applicationMessageSkippedHandler != null)
  158. {
  159. await applicationMessageSkippedHandler.HandleApplicationMessageSkippedAsync(applicationMessageSkippedEventArgs).ConfigureAwait(false);
  160. }
  161. }
  162. }
  163. }
  164. public Task SubscribeAsync(IEnumerable<MqttTopicFilter> topicFilters)
  165. {
  166. ThrowIfDisposed();
  167. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  168. foreach (var topicFilter in topicFilters)
  169. {
  170. MqttTopicValidator.ThrowIfInvalidSubscribe(topicFilter.Topic);
  171. }
  172. lock (_subscriptions)
  173. {
  174. foreach (var topicFilter in topicFilters)
  175. {
  176. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  177. _unsubscriptions.Remove(topicFilter.Topic);
  178. }
  179. }
  180. _subscriptionsQueuedSignal.Release();
  181. return Task.FromResult(0);
  182. }
  183. public Task UnsubscribeAsync(IEnumerable<string> topics)
  184. {
  185. ThrowIfDisposed();
  186. if (topics == null) throw new ArgumentNullException(nameof(topics));
  187. lock (_subscriptions)
  188. {
  189. foreach (var topic in topics)
  190. {
  191. _subscriptions.Remove(topic);
  192. _unsubscriptions.Add(topic);
  193. }
  194. }
  195. _subscriptionsQueuedSignal.Release();
  196. return Task.FromResult(0);
  197. }
  198. protected override void Dispose(bool disposing)
  199. {
  200. if (disposing)
  201. {
  202. StopPublishing();
  203. StopMaintainingConnection();
  204. if (_maintainConnectionTask != null)
  205. {
  206. _maintainConnectionTask.GetAwaiter().GetResult();
  207. _maintainConnectionTask = null;
  208. }
  209. _messageQueue.Dispose();
  210. _messageQueueLock.Dispose();
  211. InternalClient.Dispose();
  212. _subscriptionsQueuedSignal.Dispose();
  213. }
  214. base.Dispose(disposing);
  215. }
  216. async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  217. {
  218. try
  219. {
  220. while (!cancellationToken.IsCancellationRequested)
  221. {
  222. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  223. }
  224. }
  225. catch (OperationCanceledException)
  226. {
  227. }
  228. catch (Exception exception)
  229. {
  230. _logger.Error(exception, "Error exception while maintaining connection.");
  231. }
  232. finally
  233. {
  234. if (!IsDisposed)
  235. {
  236. try
  237. {
  238. using (var disconnectTimeout = new CancellationTokenSource(Options.ClientOptions.CommunicationTimeout))
  239. {
  240. await InternalClient.DisconnectAsync(disconnectTimeout.Token).ConfigureAwait(false);
  241. }
  242. }
  243. catch (OperationCanceledException)
  244. {
  245. _logger.Warning("Timeout while sending DISCONNECT packet.");
  246. }
  247. catch (Exception exception)
  248. {
  249. _logger.Error(exception, "Error while disconnecting.");
  250. }
  251. _logger.Info("Stopped");
  252. }
  253. _reconnectSubscriptions.Clear();
  254. lock (_subscriptions)
  255. {
  256. _subscriptions.Clear();
  257. _unsubscriptions.Clear();
  258. }
  259. }
  260. }
  261. async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  262. {
  263. try
  264. {
  265. var connectionState = await ReconnectIfRequiredAsync(cancellationToken).ConfigureAwait(false);
  266. if (connectionState == ReconnectionResult.NotConnected)
  267. {
  268. StopPublishing();
  269. await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  270. return;
  271. }
  272. if (connectionState == ReconnectionResult.Reconnected)
  273. {
  274. await PublishReconnectSubscriptionsAsync().ConfigureAwait(false);
  275. StartPublishing();
  276. return;
  277. }
  278. if (connectionState == ReconnectionResult.Recovered)
  279. {
  280. StartPublishing();
  281. return;
  282. }
  283. if (connectionState == ReconnectionResult.StillConnected)
  284. {
  285. await PublishSubscriptionsAsync(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
  286. }
  287. }
  288. catch (OperationCanceledException)
  289. {
  290. }
  291. catch (MqttCommunicationException exception)
  292. {
  293. _logger.Warning(exception, "Communication error while maintaining connection.");
  294. }
  295. catch (Exception exception)
  296. {
  297. _logger.Error(exception, "Error exception while maintaining connection.");
  298. }
  299. }
  300. async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  301. {
  302. try
  303. {
  304. while (!cancellationToken.IsCancellationRequested && InternalClient.IsConnected)
  305. {
  306. // Peek at the message without dequeueing in order to prevent the
  307. // possibility of the queue growing beyond the configured cap.
  308. // Previously, messages could be re-enqueued if there was an
  309. // exception, and this re-enqueueing did not honor the cap.
  310. // Furthermore, because re-enqueueing would shuffle the order
  311. // of the messages, the DropOldestQueuedMessage strategy would
  312. // be unable to know which message is actually the oldest and would
  313. // instead drop the first item in the queue.
  314. var message = _messageQueue.PeekAndWait(cancellationToken);
  315. if (message == null)
  316. {
  317. continue;
  318. }
  319. cancellationToken.ThrowIfCancellationRequested();
  320. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  321. }
  322. }
  323. catch (OperationCanceledException)
  324. {
  325. }
  326. catch (Exception exception)
  327. {
  328. _logger.Error(exception, "Error while publishing queued application messages.");
  329. }
  330. finally
  331. {
  332. _logger.Verbose("Stopped publishing messages.");
  333. }
  334. }
  335. async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)
  336. {
  337. Exception transmitException = null;
  338. try
  339. {
  340. await InternalClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);
  341. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  342. {
  343. // While publishing this message, this.PublishAsync could have booted this
  344. // message off the queue to make room for another (when using a cap
  345. // with the DropOldestQueuedMessage strategy). If the first item
  346. // in the queue is equal to this message, then it's safe to remove
  347. // it from the queue. If not, that means this.PublishAsync has already
  348. // removed it, in which case we don't want to do anything.
  349. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  350. if (_storageManager != null)
  351. {
  352. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  353. }
  354. }
  355. }
  356. catch (MqttCommunicationException exception)
  357. {
  358. transmitException = exception;
  359. _logger.Warning(exception, "Publishing application message ({0}) failed.", message.Id);
  360. if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  361. {
  362. //If QoS 0, we don't want this message to stay on the queue.
  363. //If QoS 1 or 2, it's possible that, when using a cap, this message
  364. //has been booted off the queue by this.PublishAsync, in which case this
  365. //thread will not continue to try to publish it. While this does
  366. //contradict the expected behavior of QoS 1 and 2, that's also true
  367. //for the usage of a message queue cap, so it's still consistent
  368. //with prior behavior in that way.
  369. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  370. {
  371. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  372. if (_storageManager != null)
  373. {
  374. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  375. }
  376. }
  377. }
  378. }
  379. catch (Exception exception)
  380. {
  381. transmitException = exception;
  382. _logger.Error(exception, "Error while publishing application message ({0}).", message.Id);
  383. }
  384. finally
  385. {
  386. var eventHandler = ApplicationMessageProcessedHandler;
  387. if (eventHandler != null)
  388. {
  389. var eventArguments = new ApplicationMessageProcessedEventArgs(message, transmitException);
  390. await eventHandler.HandleApplicationMessageProcessedAsync(eventArguments).ConfigureAwait(false);
  391. }
  392. }
  393. }
  394. async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancellationToken)
  395. {
  396. var endTime = DateTime.UtcNow + timeout;
  397. while (await _subscriptionsQueuedSignal.WaitAsync(GetRemainingTime(endTime), cancellationToken).ConfigureAwait(false))
  398. {
  399. List<MqttTopicFilter> subscriptions;
  400. HashSet<string> unsubscriptions;
  401. lock (_subscriptions)
  402. {
  403. subscriptions = _subscriptions.Select(i => new MqttTopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value }).ToList();
  404. _subscriptions.Clear();
  405. unsubscriptions = new HashSet<string>(_unsubscriptions);
  406. _unsubscriptions.Clear();
  407. }
  408. if (!subscriptions.Any() && !unsubscriptions.Any())
  409. {
  410. continue;
  411. }
  412. _logger.Verbose("Publishing {0} subscriptions and {1} unsubscriptions)", subscriptions.Count, unsubscriptions.Count);
  413. foreach (var unsubscription in unsubscriptions)
  414. {
  415. _reconnectSubscriptions.Remove(unsubscription);
  416. }
  417. foreach (var subscription in subscriptions)
  418. {
  419. _reconnectSubscriptions[subscription.Topic] = subscription.QualityOfServiceLevel;
  420. }
  421. try
  422. {
  423. if (unsubscriptions.Any())
  424. {
  425. await InternalClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
  426. }
  427. if (subscriptions.Any())
  428. {
  429. await InternalClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
  430. }
  431. }
  432. catch (Exception exception)
  433. {
  434. await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
  435. }
  436. }
  437. }
  438. async Task PublishReconnectSubscriptionsAsync()
  439. {
  440. _logger.Info("Publishing subscriptions at reconnect");
  441. try
  442. {
  443. if (_reconnectSubscriptions.Any())
  444. {
  445. var subscriptions = _reconnectSubscriptions.Select(i => new MqttTopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value });
  446. await InternalClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
  447. }
  448. }
  449. catch (Exception exception)
  450. {
  451. await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
  452. }
  453. }
  454. async Task HandleSubscriptionExceptionAsync(Exception exception)
  455. {
  456. _logger.Warning(exception, "Synchronizing subscriptions failed.");
  457. var synchronizingSubscriptionsFailedHandler = SynchronizingSubscriptionsFailedHandler;
  458. if (SynchronizingSubscriptionsFailedHandler != null)
  459. {
  460. await synchronizingSubscriptionsFailedHandler.HandleSynchronizingSubscriptionsFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  461. }
  462. }
  463. async Task<ReconnectionResult> ReconnectIfRequiredAsync(CancellationToken cancellationToken)
  464. {
  465. if (InternalClient.IsConnected)
  466. {
  467. return ReconnectionResult.StillConnected;
  468. }
  469. try
  470. {
  471. var result = await InternalClient.ConnectAsync(Options.ClientOptions, cancellationToken).ConfigureAwait(false);
  472. return result.IsSessionPresent ? ReconnectionResult.Recovered : ReconnectionResult.Reconnected;
  473. }
  474. catch (Exception exception)
  475. {
  476. var connectingFailedHandler = ConnectingFailedHandler;
  477. if (connectingFailedHandler != null)
  478. {
  479. await connectingFailedHandler.HandleConnectingFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  480. }
  481. return ReconnectionResult.NotConnected;
  482. }
  483. }
  484. void StartPublishing()
  485. {
  486. if (_publishingCancellationToken != null)
  487. {
  488. StopPublishing();
  489. }
  490. var cancellationTokenSource = new CancellationTokenSource();
  491. var cancellationToken = cancellationTokenSource.Token;
  492. _publishingCancellationToken = cancellationTokenSource;
  493. Task.Run(() => PublishQueuedMessagesAsync(cancellationToken), cancellationToken).RunInBackground(_logger);
  494. }
  495. void StopPublishing()
  496. {
  497. try
  498. {
  499. _publishingCancellationToken?.Cancel(false);
  500. }
  501. finally
  502. {
  503. _publishingCancellationToken?.Dispose();
  504. _publishingCancellationToken = null;
  505. }
  506. }
  507. void StopMaintainingConnection()
  508. {
  509. try
  510. {
  511. _connectionCancellationToken?.Cancel(false);
  512. }
  513. finally
  514. {
  515. _connectionCancellationToken?.Dispose();
  516. _connectionCancellationToken = null;
  517. }
  518. }
  519. static TimeSpan GetRemainingTime(DateTime endTime)
  520. {
  521. var remainingTime = endTime - DateTime.UtcNow;
  522. return remainingTime < TimeSpan.Zero ? TimeSpan.Zero : remainingTime;
  523. }
  524. }
  525. }