Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.
 
 
 
 

586 righe
23 KiB

  1. using MQTTnet.Client;
  2. using MQTTnet.Client.Connecting;
  3. using MQTTnet.Client.Disconnecting;
  4. using MQTTnet.Client.Publishing;
  5. using MQTTnet.Client.Receiving;
  6. using MQTTnet.Diagnostics;
  7. using MQTTnet.Exceptions;
  8. using MQTTnet.Internal;
  9. using MQTTnet.Protocol;
  10. using MQTTnet.Server;
  11. using System;
  12. using System.Collections.Generic;
  13. using System.Linq;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. namespace MQTTnet.Extensions.ManagedClient
  17. {
  18. public class ManagedMqttClient : Disposable, IManagedMqttClient
  19. {
  20. private readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();
  21. /// <summary>
  22. /// The subscriptions are managed in 2 separate buckets:
  23. /// <see cref="_subscriptions"/> and <see cref="_unsubscriptions"/> are processed during normal operation
  24. /// and are moved to the <see cref="_reconnectSubscriptions"/> when they get processed. They can be accessed by
  25. /// any thread and are therefore mutex'ed. <see cref="_reconnectSubscriptions"/> get sent to the broker
  26. /// at reconnect and are solely owned by <see cref="MaintainConnectionAsync"/>.
  27. /// </summary>
  28. private readonly Dictionary<string, MqttQualityOfServiceLevel> _reconnectSubscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  29. private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  30. private readonly HashSet<string> _unsubscriptions = new HashSet<string>();
  31. private readonly SemaphoreSlim _subscriptionsQueuedSignal = new SemaphoreSlim(0);
  32. private readonly IMqttClient _mqttClient;
  33. private readonly IMqttNetLogger _logger;
  34. private readonly AsyncLock _messageQueueLock = new AsyncLock();
  35. private CancellationTokenSource _connectionCancellationToken;
  36. private CancellationTokenSource _publishingCancellationToken;
  37. private Task _maintainConnectionTask;
  38. private ManagedMqttClientStorageManager _storageManager;
  39. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
  40. {
  41. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  42. if (logger == null) throw new ArgumentNullException(nameof(logger));
  43. _logger = logger.CreateChildLogger(nameof(ManagedMqttClient));
  44. }
  45. public bool IsConnected => _mqttClient.IsConnected;
  46. public bool IsStarted => _connectionCancellationToken != null;
  47. public int PendingApplicationMessagesCount => _messageQueue.Count;
  48. public IManagedMqttClientOptions Options { get; private set; }
  49. public IMqttClientConnectedHandler ConnectedHandler
  50. {
  51. get => _mqttClient.ConnectedHandler;
  52. set => _mqttClient.ConnectedHandler = value;
  53. }
  54. public IMqttClientDisconnectedHandler DisconnectedHandler
  55. {
  56. get => _mqttClient.DisconnectedHandler;
  57. set => _mqttClient.DisconnectedHandler = value;
  58. }
  59. public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
  60. {
  61. get => _mqttClient.ApplicationMessageReceivedHandler;
  62. set => _mqttClient.ApplicationMessageReceivedHandler = value;
  63. }
  64. public IApplicationMessageProcessedHandler ApplicationMessageProcessedHandler { get; set; }
  65. public IApplicationMessageSkippedHandler ApplicationMessageSkippedHandler { get; set; }
  66. public IConnectingFailedHandler ConnectingFailedHandler { get; set; }
  67. public ISynchronizingSubscriptionsFailedHandler SynchronizingSubscriptionsFailedHandler { get; set; }
  68. public async Task StartAsync(IManagedMqttClientOptions options)
  69. {
  70. ThrowIfDisposed();
  71. if (options == null) throw new ArgumentNullException(nameof(options));
  72. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  73. if (!_maintainConnectionTask?.IsCompleted ?? false) throw new InvalidOperationException("The managed client is already started.");
  74. Options = options;
  75. if (Options.Storage != null)
  76. {
  77. _storageManager = new ManagedMqttClientStorageManager(Options.Storage);
  78. var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  79. foreach (var message in messages)
  80. {
  81. _messageQueue.Enqueue(message);
  82. }
  83. }
  84. _connectionCancellationToken = new CancellationTokenSource();
  85. _maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token);
  86. _maintainConnectionTask.Forget(_logger);
  87. _logger.Info("Started");
  88. }
  89. public async Task StopAsync()
  90. {
  91. ThrowIfDisposed();
  92. StopPublishing();
  93. StopMaintainingConnection();
  94. _messageQueue.Clear();
  95. if (_maintainConnectionTask != null)
  96. {
  97. await Task.WhenAny(_maintainConnectionTask);
  98. _maintainConnectionTask = null;
  99. }
  100. }
  101. public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
  102. {
  103. ThrowIfDisposed();
  104. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  105. await PublishAsync(new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage).Build()).ConfigureAwait(false);
  106. return new MqttClientPublishResult();
  107. }
  108. public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
  109. {
  110. ThrowIfDisposed();
  111. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  112. if (Options == null) throw new InvalidOperationException("call StartAsync before publishing messages");
  113. MqttTopicValidator.ThrowIfInvalid(applicationMessage.ApplicationMessage.Topic);
  114. ManagedMqttApplicationMessage removedMessage = null;
  115. ApplicationMessageSkippedEventArgs applicationMessageSkippedEventArgs = null;
  116. try
  117. {
  118. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))
  119. {
  120. if (_messageQueue.Count >= Options.MaxPendingMessages)
  121. {
  122. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
  123. {
  124. _logger.Verbose("Skipping publish of new application message because internal queue is full.");
  125. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(applicationMessage);
  126. return;
  127. }
  128. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
  129. {
  130. removedMessage = _messageQueue.RemoveFirst();
  131. _logger.Verbose("Removed oldest application message from internal queue because it is full.");
  132. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(removedMessage);
  133. }
  134. }
  135. _messageQueue.Enqueue(applicationMessage);
  136. if (_storageManager != null)
  137. {
  138. if (removedMessage != null)
  139. {
  140. await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
  141. }
  142. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  143. }
  144. }
  145. }
  146. finally
  147. {
  148. if (applicationMessageSkippedEventArgs != null)
  149. {
  150. var applicationMessageSkippedHandler = ApplicationMessageSkippedHandler;
  151. if (applicationMessageSkippedHandler != null)
  152. {
  153. await applicationMessageSkippedHandler.HandleApplicationMessageSkippedAsync(applicationMessageSkippedEventArgs).ConfigureAwait(false);
  154. }
  155. }
  156. }
  157. }
  158. public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  159. {
  160. ThrowIfDisposed();
  161. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  162. lock (_subscriptions)
  163. {
  164. foreach (var topicFilter in topicFilters)
  165. {
  166. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  167. _unsubscriptions.Remove(topicFilter.Topic);
  168. }
  169. }
  170. _subscriptionsQueuedSignal.Release();
  171. return Task.FromResult(0);
  172. }
  173. public Task UnsubscribeAsync(IEnumerable<string> topics)
  174. {
  175. ThrowIfDisposed();
  176. if (topics == null) throw new ArgumentNullException(nameof(topics));
  177. lock (_subscriptions)
  178. {
  179. foreach (var topic in topics)
  180. {
  181. _subscriptions.Remove(topic);
  182. _unsubscriptions.Add(topic);
  183. }
  184. }
  185. _subscriptionsQueuedSignal.Release();
  186. return Task.FromResult(0);
  187. }
  188. protected override void Dispose(bool disposing)
  189. {
  190. if (disposing)
  191. {
  192. StopPublishing();
  193. StopMaintainingConnection();
  194. if (_maintainConnectionTask != null)
  195. {
  196. _maintainConnectionTask.GetAwaiter().GetResult();
  197. _maintainConnectionTask = null;
  198. }
  199. _messageQueue.Dispose();
  200. _messageQueueLock.Dispose();
  201. _mqttClient.Dispose();
  202. _subscriptionsQueuedSignal.Dispose();
  203. }
  204. base.Dispose(disposing);
  205. }
  206. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  207. {
  208. try
  209. {
  210. while (!cancellationToken.IsCancellationRequested)
  211. {
  212. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  213. }
  214. }
  215. catch (OperationCanceledException)
  216. {
  217. }
  218. catch (Exception exception)
  219. {
  220. _logger.Error(exception, "Error exception while maintaining connection.");
  221. }
  222. finally
  223. {
  224. if (!IsDisposed)
  225. {
  226. try
  227. {
  228. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  229. }
  230. catch (Exception exception)
  231. {
  232. _logger.Error(exception, "Error while disconnecting.");
  233. }
  234. _logger.Info("Stopped");
  235. }
  236. _reconnectSubscriptions.Clear();
  237. lock (_subscriptions)
  238. {
  239. _subscriptions.Clear();
  240. _unsubscriptions.Clear();
  241. }
  242. }
  243. }
  244. private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  245. {
  246. try
  247. {
  248. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  249. if (connectionState == ReconnectionResult.NotConnected)
  250. {
  251. StopPublishing();
  252. await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  253. return;
  254. }
  255. if (connectionState == ReconnectionResult.Reconnected)
  256. {
  257. await PublishReconnectSubscriptionsAsync().ConfigureAwait(false);
  258. StartPublishing();
  259. return;
  260. }
  261. if (connectionState == ReconnectionResult.Recovered)
  262. {
  263. StartPublishing();
  264. return;
  265. }
  266. if (connectionState == ReconnectionResult.StillConnected)
  267. {
  268. await PublishSubscriptionsAsync(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
  269. }
  270. }
  271. catch (OperationCanceledException)
  272. {
  273. }
  274. catch (MqttCommunicationException exception)
  275. {
  276. _logger.Warning(exception, "Communication error while maintaining connection.");
  277. }
  278. catch (Exception exception)
  279. {
  280. _logger.Error(exception, "Error exception while maintaining connection.");
  281. }
  282. }
  283. private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  284. {
  285. try
  286. {
  287. while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected)
  288. {
  289. // Peek at the message without dequeueing in order to prevent the
  290. // possibility of the queue growing beyond the configured cap.
  291. // Previously, messages could be re-enqueued if there was an
  292. // exception, and this re-enqueueing did not honor the cap.
  293. // Furthermore, because re-enqueueing would shuffle the order
  294. // of the messages, the DropOldestQueuedMessage strategy would
  295. // be unable to know which message is actually the oldest and would
  296. // instead drop the first item in the queue.
  297. var message = _messageQueue.PeekAndWait(cancellationToken);
  298. if (message == null)
  299. {
  300. continue;
  301. }
  302. cancellationToken.ThrowIfCancellationRequested();
  303. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  304. }
  305. }
  306. catch (OperationCanceledException)
  307. {
  308. }
  309. catch (Exception exception)
  310. {
  311. _logger.Error(exception, "Error while publishing queued application messages.");
  312. }
  313. finally
  314. {
  315. _logger.Verbose("Stopped publishing messages.");
  316. }
  317. }
  318. private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)
  319. {
  320. Exception transmitException = null;
  321. try
  322. {
  323. await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);
  324. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  325. {
  326. // While publishing this message, this.PublishAsync could have booted this
  327. // message off the queue to make room for another (when using a cap
  328. // with the DropOldestQueuedMessage strategy). If the first item
  329. // in the queue is equal to this message, then it's safe to remove
  330. // it from the queue. If not, that means this.PublishAsync has already
  331. // removed it, in which case we don't want to do anything.
  332. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  333. if (_storageManager != null)
  334. {
  335. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  336. }
  337. }
  338. }
  339. catch (MqttCommunicationException exception)
  340. {
  341. transmitException = exception;
  342. _logger.Warning(exception, $"Publishing application ({message.Id}) message failed.");
  343. if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  344. {
  345. //If QoS 0, we don't want this message to stay on the queue.
  346. //If QoS 1 or 2, it's possible that, when using a cap, this message
  347. //has been booted off the queue by this.PublishAsync, in which case this
  348. //thread will not continue to try to publish it. While this does
  349. //contradict the expected behavior of QoS 1 and 2, that's also true
  350. //for the usage of a message queue cap, so it's still consistent
  351. //with prior behavior in that way.
  352. using (await _messageQueueLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
  353. {
  354. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  355. if (_storageManager != null)
  356. {
  357. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  358. }
  359. }
  360. }
  361. }
  362. catch (Exception exception)
  363. {
  364. transmitException = exception;
  365. _logger.Error(exception, $"Error while publishing application message ({message.Id}).");
  366. }
  367. finally
  368. {
  369. var eventHandler = ApplicationMessageProcessedHandler;
  370. if (eventHandler != null)
  371. {
  372. var eventArguments = new ApplicationMessageProcessedEventArgs(message, transmitException);
  373. await eventHandler.HandleApplicationMessageProcessedAsync(eventArguments).ConfigureAwait(false);
  374. }
  375. }
  376. }
  377. private async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancellationToken)
  378. {
  379. var endTime = DateTime.UtcNow + timeout;
  380. while (await _subscriptionsQueuedSignal.WaitAsync(GetRemainingTime(endTime), cancellationToken).ConfigureAwait(false))
  381. {
  382. List<TopicFilter> subscriptions;
  383. HashSet<string> unsubscriptions;
  384. lock (_subscriptions)
  385. {
  386. subscriptions = _subscriptions.Select(i => new TopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value }).ToList();
  387. _subscriptions.Clear();
  388. unsubscriptions = new HashSet<string>(_unsubscriptions);
  389. _unsubscriptions.Clear();
  390. }
  391. if (!subscriptions.Any() && !unsubscriptions.Any())
  392. {
  393. continue;
  394. }
  395. _logger.Verbose($"Publishing subscriptions ({subscriptions.Count} subscriptions and {unsubscriptions.Count} unsubscriptions)");
  396. foreach (var unsubscription in unsubscriptions)
  397. {
  398. _reconnectSubscriptions.Remove(unsubscription);
  399. }
  400. foreach (var subscription in subscriptions)
  401. {
  402. _reconnectSubscriptions[subscription.Topic] = subscription.QualityOfServiceLevel;
  403. }
  404. try
  405. {
  406. if (unsubscriptions.Any())
  407. {
  408. await _mqttClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
  409. }
  410. if (subscriptions.Any())
  411. {
  412. await _mqttClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
  413. }
  414. }
  415. catch (Exception exception)
  416. {
  417. await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
  418. }
  419. }
  420. }
  421. private async Task PublishReconnectSubscriptionsAsync()
  422. {
  423. _logger.Info("Publishing subscriptions at reconnect");
  424. try
  425. {
  426. if (_reconnectSubscriptions.Any())
  427. {
  428. var subscriptions = _reconnectSubscriptions.Select(i => new TopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value });
  429. await _mqttClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
  430. }
  431. }
  432. catch (Exception exception)
  433. {
  434. await HandleSubscriptionExceptionAsync(exception).ConfigureAwait(false);
  435. }
  436. }
  437. private async Task HandleSubscriptionExceptionAsync(Exception exception)
  438. {
  439. _logger.Warning(exception, "Synchronizing subscriptions failed.");
  440. var synchronizingSubscriptionsFailedHandler = SynchronizingSubscriptionsFailedHandler;
  441. if (SynchronizingSubscriptionsFailedHandler != null)
  442. {
  443. await synchronizingSubscriptionsFailedHandler.HandleSynchronizingSubscriptionsFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  444. }
  445. }
  446. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  447. {
  448. if (_mqttClient.IsConnected)
  449. {
  450. return ReconnectionResult.StillConnected;
  451. }
  452. try
  453. {
  454. var result = await _mqttClient.ConnectAsync(Options.ClientOptions).ConfigureAwait(false);
  455. return result.IsSessionPresent ? ReconnectionResult.Recovered : ReconnectionResult.Reconnected;
  456. }
  457. catch (Exception exception)
  458. {
  459. var connectingFailedHandler = ConnectingFailedHandler;
  460. if (connectingFailedHandler != null)
  461. {
  462. await connectingFailedHandler.HandleConnectingFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  463. }
  464. return ReconnectionResult.NotConnected;
  465. }
  466. }
  467. private void StartPublishing()
  468. {
  469. if (_publishingCancellationToken != null)
  470. {
  471. StopPublishing();
  472. }
  473. var cts = new CancellationTokenSource();
  474. _publishingCancellationToken = cts;
  475. Task.Run(() => PublishQueuedMessagesAsync(cts.Token), cts.Token).Forget(_logger);
  476. }
  477. private void StopPublishing()
  478. {
  479. _publishingCancellationToken?.Cancel(false);
  480. _publishingCancellationToken?.Dispose();
  481. _publishingCancellationToken = null;
  482. }
  483. private void StopMaintainingConnection()
  484. {
  485. _connectionCancellationToken?.Cancel(false);
  486. _connectionCancellationToken?.Dispose();
  487. _connectionCancellationToken = null;
  488. }
  489. private TimeSpan GetRemainingTime(DateTime endTime)
  490. {
  491. var remainingTime = endTime - DateTime.UtcNow;
  492. return remainingTime < TimeSpan.Zero ? TimeSpan.Zero : remainingTime;
  493. }
  494. }
  495. }