Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 
 

621 строка
24 KiB

  1. using MQTTnet.Client;
  2. using MQTTnet.Client.Connecting;
  3. using MQTTnet.Client.Disconnecting;
  4. using MQTTnet.Client.Publishing;
  5. using MQTTnet.Client.Receiving;
  6. using MQTTnet.Diagnostics;
  7. using MQTTnet.Exceptions;
  8. using MQTTnet.Internal;
  9. using MQTTnet.Protocol;
  10. using MQTTnet.Server;
  11. using System;
  12. using System.Collections.Generic;
  13. using System.Linq;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. namespace MQTTnet.Extensions.ManagedClient
  17. {
  18. public class ManagedMqttClient : Disposable, IManagedMqttClient
  19. {
  20. readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();
  21. /// <summary>
  22. /// The subscriptions are managed in 2 separate buckets:
  23. /// <see cref="_subscriptions"/> and <see cref="_unsubscriptions"/> are processed during normal operation
  24. /// and are moved to the <see cref="_reconnectSubscriptions"/> when they get processed. They can be accessed by
  25. /// any thread and are therefore mutex'ed. <see cref="_reconnectSubscriptions"/> get sent to the broker
  26. /// at reconnect and are solely owned by <see cref="MaintainConnectionAsync"/>.
  27. /// </summary>
  28. readonly Dictionary<string, MqttQualityOfServiceLevel> _reconnectSubscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  29. readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  30. readonly HashSet<string> _unsubscriptions = new HashSet<string>();
  31. readonly SemaphoreSlim _subscriptionsQueuedSignal = new SemaphoreSlim(0);
  32. readonly IMqttNetScopedLogger _logger;
  33. readonly AsyncLock _messageQueueLock = new AsyncLock();
  34. CancellationTokenSource _connectionCancellationToken;
  35. CancellationTokenSource _publishingCancellationToken;
  36. Task _maintainConnectionTask;
  37. ManagedMqttClientStorageManager _storageManager;
  38. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
  39. {
  40. InternalClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  41. if (logger == null) throw new ArgumentNullException(nameof(logger));
  42. _logger = logger.CreateScopedLogger(nameof(ManagedMqttClient));
  43. }
  44. public bool IsConnected => InternalClient.IsConnected;
  45. public bool IsStarted => _connectionCancellationToken != null;
  46. public IMqttClient InternalClient { get; }
  47. public int PendingApplicationMessagesCount => _messageQueue.Count;
  48. public IManagedMqttClientOptions Options { get; private set; }
  49. public IMqttClientConnectedHandler ConnectedHandler
  50. {
  51. get => InternalClient.ConnectedHandler;
  52. set => InternalClient.ConnectedHandler = value;
  53. }
  54. public IMqttClientDisconnectedHandler DisconnectedHandler
  55. {
  56. get => InternalClient.DisconnectedHandler;
  57. set => InternalClient.DisconnectedHandler = value;
  58. }
  59. public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
  60. {
  61. get => InternalClient.ApplicationMessageReceivedHandler;
  62. set => InternalClient.ApplicationMessageReceivedHandler = value;
  63. }
  64. public IApplicationMessageProcessedHandler ApplicationMessageProcessedHandler { get; set; }
  65. public IApplicationMessageSkippedHandler ApplicationMessageSkippedHandler { get; set; }
  66. public IConnectingFailedHandler ConnectingFailedHandler { get; set; }
  67. public ISynchronizingSubscriptionsFailedHandler SynchronizingSubscriptionsFailedHandler { get; set; }
  68. public async Task StartAsync(IManagedMqttClientOptions options)
  69. {
  70. ThrowIfDisposed();
  71. if (options == null) throw new ArgumentNullException(nameof(options));
  72. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  73. if (!_maintainConnectionTask?.IsCompleted ?? false) throw new InvalidOperationException("The managed client is already started.");
  74. Options = options;
  75. if (options.Storage != null)
  76. {
  77. _storageManager = new ManagedMqttClientStorageManager(options.Storage);
  78. var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  79. foreach (var message in messages)
  80. {
  81. _messageQueue.Enqueue(message);
  82. }
  83. }
  84. var cancellationTokenSource = new CancellationTokenSource();
  85. var cancellationToken = cancellationTokenSource.Token;
  86. _connectionCancellationToken = cancellationTokenSource;
  87. _maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(cancellationToken), cancellationToken);
  88. _maintainConnectionTask.RunInBackground(_logger);
  89. _logger.Info("Started");
  90. }
  91. public async Task StopAsync()
  92. {
  93. ThrowIfDisposed();
  94. StopPublishing();
  95. StopMaintainingConnection();
  96. _messageQueue.Clear();
  97. if (_maintainConnectionTask != null)
  98. {
  99. await Task.WhenAny(_maintainConnectionTask);
  100. _maintainConnectionTask = null;
  101. }
  102. }
  103. public Task PingAsync(CancellationToken cancellationToken)
  104. {
  105. return InternalClient.PingAsync(cancellationToken);
  106. }
  107. public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
  108. {
  109. ThrowIfDisposed();
  110. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  111. await PublishAsync(new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage).Build()).ConfigureAwait(false);
  112. return new MqttClientPublishResult();
  113. }
  114. public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
  115. {
  116. ThrowIfDisposed();
  117. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  118. if (Options == null) throw new InvalidOperationException("call StartAsync before publishing messages");
  119. MqttTopicValidator.ThrowIfInvalid(applicationMessage.ApplicationMessage.Topic);
  120. ManagedMqttApplicationMessage removedMessage = null;
  121. ApplicationMessageSkippedEventArgs applicationMessageSkippedEventArgs = null;
  122. try
  123. {
  124. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))
  125. {
  126. if (_messageQueue.Count >= Options.MaxPendingMessages)
  127. {
  128. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
  129. {
  130. _logger.Verbose("Skipping publish of new application message because internal queue is full.");
  131. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(applicationMessage);
  132. return;
  133. }
  134. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
  135. {
  136. removedMessage = _messageQueue.RemoveFirst();
  137. _logger.Verbose("Removed oldest application message from internal queue because it is full.");
  138. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(removedMessage);
  139. }
  140. }
  141. _messageQueue.Enqueue(applicationMessage);
  142. if (_storageManager != null)
  143. {
  144. if (removedMessage != null)
  145. {
  146. await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
  147. }
  148. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  149. }
  150. }
  151. }
  152. finally
  153. {
  154. if (applicationMessageSkippedEventArgs != null)
  155. {
  156. var applicationMessageSkippedHandler = ApplicationMessageSkippedHandler;
  157. if (applicationMessageSkippedHandler != null)
  158. {
  159. await applicationMessageSkippedHandler.HandleApplicationMessageSkippedAsync(applicationMessageSkippedEventArgs).ConfigureAwait(false);
  160. }
  161. }
  162. }
  163. }
  164. public Task SubscribeAsync(IEnumerable<MqttTopicFilter> topicFilters)
  165. {
  166. ThrowIfDisposed();
  167. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  168. lock (_subscriptions)
  169. {
  170. foreach (var topicFilter in topicFilters)
  171. {
  172. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  173. _unsubscriptions.Remove(topicFilter.Topic);
  174. }
  175. }
  176. _subscriptionsQueuedSignal.Release();
  177. return Task.FromResult(0);
  178. }
  179. public Task UnsubscribeAsync(IEnumerable<string> topics)
  180. {
  181. ThrowIfDisposed();
  182. if (topics == null) throw new ArgumentNullException(nameof(topics));
  183. lock (_subscriptions)
  184. {
  185. foreach (var topic in topics)
  186. {
  187. _subscriptions.Remove(topic);
  188. _unsubscriptions.Add(topic);
  189. }
  190. }
  191. _subscriptionsQueuedSignal.Release();
  192. return Task.FromResult(0);
  193. }
  194. protected override void Dispose(bool disposing)
  195. {
  196. if (disposing)
  197. {
  198. StopPublishing();
  199. StopMaintainingConnection();
  200. if (_maintainConnectionTask != null)
  201. {
  202. _maintainConnectionTask.GetAwaiter().GetResult();
  203. _maintainConnectionTask = null;
  204. }
  205. _messageQueue.Dispose();
  206. _messageQueueLock.Dispose();
  207. InternalClient.Dispose();
  208. _subscriptionsQueuedSignal.Dispose();
  209. }
  210. base.Dispose(disposing);
  211. }
  212. async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  213. {
  214. try
  215. {
  216. while (!cancellationToken.IsCancellationRequested)
  217. {
  218. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  219. }
  220. }
  221. catch (OperationCanceledException)
  222. {
  223. }
  224. catch (Exception exception)
  225. {
  226. _logger.Error(exception, "Error exception while maintaining connection.");
  227. }
  228. finally
  229. {
  230. if (!IsDisposed)
  231. {
  232. try
  233. {
  234. using (var disconnectTimeout = new CancellationTokenSource(Options.ClientOptions.CommunicationTimeout))
  235. {
  236. await InternalClient.DisconnectAsync(disconnectTimeout.Token).ConfigureAwait(false);
  237. }
  238. }
  239. catch (OperationCanceledException)
  240. {
  241. _logger.Warning("Timeout while sending DISCONNECT packet.");
  242. }
  243. catch (Exception exception)
  244. {
  245. _logger.Error(exception, "Error while disconnecting.");
  246. }
  247. _logger.Info("Stopped");
  248. }
  249. _reconnectSubscriptions.Clear();
  250. lock (_subscriptions)
  251. {
  252. _subscriptions.Clear();
  253. _unsubscriptions.Clear();
  254. }
  255. }
  256. }
  257. async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  258. {
  259. try
  260. {
  261. var connectionState = await ReconnectIfRequiredAsync(cancellationToken).ConfigureAwait(false);
  262. if (connectionState == ReconnectionResult.NotConnected)
  263. {
  264. StopPublishing();
  265. await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  266. return;
  267. }
  268. if (connectionState == ReconnectionResult.Reconnected)
  269. {
  270. await PublishReconnectSubscriptionsAsync().ConfigureAwait(false);
  271. StartPublishing();
  272. return;
  273. }
  274. if (connectionState == ReconnectionResult.Recovered)
  275. {
  276. StartPublishing();
  277. return;
  278. }
  279. if (connectionState == ReconnectionResult.StillConnected)
  280. {
  281. await PublishSubscriptionsAsync(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
  282. }
  283. }
  284. catch (OperationCanceledException)
  285. {
  286. }
  287. catch (MqttCommunicationException exception)
  288. {
  289. _logger.Warning(exception, "Communication error while maintaining connection.");
  290. }
  291. catch (Exception exception)
  292. {
  293. _logger.Error(exception, "Error exception while maintaining connection.");
  294. }
  295. }
  296. async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  297. {
  298. try
  299. {
  300. while (!cancellationToken.IsCancellationRequested && InternalClient.IsConnected)
  301. {
  302. // Peek at the message without dequeueing in order to prevent the
  303. // possibility of the queue growing beyond the configured cap.
  304. // Previously, messages could be re-enqueued if there was an
  305. // exception, and this re-enqueueing did not honor the cap.
  306. // Furthermore, because re-enqueueing would shuffle the order
  307. // of the messages, the DropOldestQueuedMessage strategy would
  308. // be unable to know which message is actually the oldest and would
  309. // instead drop the first item in the queue.
  310. var message = _messageQueue.PeekAndWait(cancellationToken);
  311. if (message == null)
  312. {
  313. continue;
  314. }
  315. cancellationToken.ThrowIfCancellationRequested();
  316. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  317. }
  318. }
  319. catch (OperationCanceledException)
  320. {
  321. }
  322. catch (Exception exception)
  323. {
  324. _logger.Error(exception, "Error while publishing queued application messages.");
  325. }
  326. finally
  327. {
  328. _logger.Verbose("Stopped publishing messages.");
  329. }
  330. }
  331. async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)
  332. {
  333. Exception transmitException = null;
  334. try
  335. {
  336. await InternalClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);
  337. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  338. {
  339. // While publishing this message, this.PublishAsync could have booted this
  340. // message off the queue to make room for another (when using a cap
  341. // with the DropOldestQueuedMessage strategy). If the first item
  342. // in the queue is equal to this message, then it's safe to remove
  343. // it from the queue. If not, that means this.PublishAsync has already
  344. // removed it, in which case we don't want to do anything.
  345. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  346. if (_storageManager != null)
  347. {
  348. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  349. }
  350. }
  351. }
  352. catch (MqttCommunicationException exception)
  353. {
  354. transmitException = exception;
  355. _logger.Warning(exception, "Publishing application message ({0}) failed.", message.Id);
  356. if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  357. {
  358. //If QoS 0, we don't want this message to stay on the queue.
  359. //If QoS 1 or 2, it's possible that, when using a cap, this message
  360. //has been booted off the queue by this.PublishAsync, in which case this
  361. //thread will not continue to try to publish it. While this does
  362. //contradict the expected behavior of QoS 1 and 2, that's also true
  363. //for the usage of a message queue cap, so it's still consistent
  364. //with prior behavior in that way.
  365. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  366. {
  367. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  368. if (_storageManager != null)
  369. {
  370. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  371. }
  372. }
  373. }
  374. }
  375. catch (Exception exception)
  376. {
  377. transmitException = exception;
  378. _logger.Error(exception, "Error while publishing application message ({0}).", message.Id);
  379. }
  380. finally
  381. {
  382. var eventHandler = ApplicationMessageProcessedHandler;
  383. if (eventHandler != null)
  384. {
  385. var eventArguments = new ApplicationMessageProcessedEventArgs(message, transmitException);
  386. await eventHandler.HandleApplicationMessageProcessedAsync(eventArguments).ConfigureAwait(false);
  387. }
  388. }
  389. }
  390. async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancellationToken)
  391. {
  392. var endTime = DateTime.UtcNow + timeout;
  393. while (await _subscriptionsQueuedSignal.WaitAsync(GetRemainingTime(endTime), cancellationToken).ConfigureAwait(false))
  394. {
  395. List<MqttTopicFilter> subscriptions;
  396. HashSet<string> unsubscriptions;
  397. lock (_subscriptions)
  398. {
  399. subscriptions = _subscriptions.Select(i => new MqttTopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value }).ToList();
  400. _subscriptions.Clear();
  401. unsubscriptions = new HashSet<string>(_unsubscriptions);
  402. _unsubscriptions.Clear();
  403. }
  404. if (!subscriptions.Any() && !unsubscriptions.Any())
  405. {
  406. continue;
  407. }
  408. _logger.Verbose("Publishing {0} subscriptions and {1} unsubscriptions)", subscriptions.Count, unsubscriptions.Count);
  409. foreach (var unsubscription in unsubscriptions)
  410. {
  411. _reconnectSubscriptions.Remove(unsubscription);
  412. }
  413. foreach (var subscription in subscriptions)
  414. {
  415. _reconnectSubscriptions[subscription.Topic] = subscription.QualityOfServiceLevel;
  416. }
  417. try
  418. {
  419. if (unsubscriptions.Any())
  420. {
  421. await InternalClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
  422. }
  423. if (subscriptions.Any())
  424. {
  425. await InternalClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
  426. }
  427. }
  428. catch (Exception exception)
  429. {
  430. await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
  431. }
  432. }
  433. }
  434. async Task PublishReconnectSubscriptionsAsync()
  435. {
  436. _logger.Info("Publishing subscriptions at reconnect");
  437. try
  438. {
  439. if (_reconnectSubscriptions.Any())
  440. {
  441. var subscriptions = _reconnectSubscriptions.Select(i => new MqttTopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value });
  442. await InternalClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
  443. }
  444. }
  445. catch (Exception exception)
  446. {
  447. await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
  448. }
  449. }
  450. async Task HandleSubscriptionExceptionAsync(Exception exception)
  451. {
  452. _logger.Warning(exception, "Synchronizing subscriptions failed.");
  453. var synchronizingSubscriptionsFailedHandler = SynchronizingSubscriptionsFailedHandler;
  454. if (SynchronizingSubscriptionsFailedHandler != null)
  455. {
  456. await synchronizingSubscriptionsFailedHandler.HandleSynchronizingSubscriptionsFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  457. }
  458. }
  459. async Task<ReconnectionResult> ReconnectIfRequiredAsync(CancellationToken cancellationToken)
  460. {
  461. if (InternalClient.IsConnected)
  462. {
  463. return ReconnectionResult.StillConnected;
  464. }
  465. try
  466. {
  467. var result = await InternalClient.ConnectAsync(Options.ClientOptions, cancellationToken).ConfigureAwait(false);
  468. return result.IsSessionPresent ? ReconnectionResult.Recovered : ReconnectionResult.Reconnected;
  469. }
  470. catch (Exception exception)
  471. {
  472. var connectingFailedHandler = ConnectingFailedHandler;
  473. if (connectingFailedHandler != null)
  474. {
  475. await connectingFailedHandler.HandleConnectingFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  476. }
  477. return ReconnectionResult.NotConnected;
  478. }
  479. }
  480. void StartPublishing()
  481. {
  482. if (_publishingCancellationToken != null)
  483. {
  484. StopPublishing();
  485. }
  486. var cancellationTokenSource = new CancellationTokenSource();
  487. var cancellationToken = cancellationTokenSource.Token;
  488. _publishingCancellationToken = cancellationTokenSource;
  489. Task.Run(() => PublishQueuedMessagesAsync(cancellationToken), cancellationToken).RunInBackground(_logger);
  490. }
  491. void StopPublishing()
  492. {
  493. try
  494. {
  495. _publishingCancellationToken?.Cancel(false);
  496. }
  497. finally
  498. {
  499. _publishingCancellationToken?.Dispose();
  500. _publishingCancellationToken = null;
  501. }
  502. }
  503. void StopMaintainingConnection()
  504. {
  505. try
  506. {
  507. _connectionCancellationToken?.Cancel(false);
  508. }
  509. finally
  510. {
  511. _connectionCancellationToken?.Dispose();
  512. _connectionCancellationToken = null;
  513. }
  514. }
  515. static TimeSpan GetRemainingTime(DateTime endTime)
  516. {
  517. var remainingTime = endTime - DateTime.UtcNow;
  518. return remainingTime < TimeSpan.Zero ? TimeSpan.Zero : remainingTime;
  519. }
  520. }
  521. }