選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

ManagedMqttClient.cs 20 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Client;
  7. using MQTTnet.Client.Connecting;
  8. using MQTTnet.Client.Disconnecting;
  9. using MQTTnet.Client.Publishing;
  10. using MQTTnet.Client.Receiving;
  11. using MQTTnet.Diagnostics;
  12. using MQTTnet.Exceptions;
  13. using MQTTnet.Internal;
  14. using MQTTnet.Protocol;
  15. using MQTTnet.Server;
  16. namespace MQTTnet.Extensions.ManagedClient
  17. {
  18. public class ManagedMqttClient : IManagedMqttClient
  19. {
  20. private readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();
  21. private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  22. private readonly HashSet<string> _unsubscriptions = new HashSet<string>();
  23. private readonly IMqttClient _mqttClient;
  24. private readonly IMqttNetChildLogger _logger;
  25. private CancellationTokenSource _connectionCancellationToken;
  26. private CancellationTokenSource _publishingCancellationToken;
  27. private Task _maintainConnectionTask;
  28. private ManagedMqttClientStorageManager _storageManager;
  29. private bool _disposed;
  30. private bool _subscriptionsNotPushed;
  31. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
  32. {
  33. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  34. if (logger == null) throw new ArgumentNullException(nameof(logger));
  35. _logger = logger.CreateChildLogger(nameof(ManagedMqttClient));
  36. }
  37. public bool IsConnected => _mqttClient.IsConnected;
  38. public bool IsStarted => _connectionCancellationToken != null;
  39. public int PendingApplicationMessagesCount => _messageQueue.Count;
  40. public IManagedMqttClientOptions Options { get; private set; }
  41. public IMqttClientConnectedHandler ConnectedHandler
  42. {
  43. get => _mqttClient.ConnectedHandler;
  44. set => _mqttClient.ConnectedHandler = value;
  45. }
  46. public IMqttClientDisconnectedHandler DisconnectedHandler
  47. {
  48. get => _mqttClient.DisconnectedHandler;
  49. set => _mqttClient.DisconnectedHandler = value;
  50. }
  51. public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
  52. {
  53. get => _mqttClient.ApplicationMessageReceivedHandler;
  54. set => _mqttClient.ApplicationMessageReceivedHandler = value;
  55. }
  56. public IApplicationMessageProcessedHandler ApplicationMessageProcessedHandler { get; set; }
  57. public IApplicationMessageSkippedHandler ApplicationMessageSkippedHandler { get; set; }
  58. public IConnectingFailedHandler ConnectingFailedHandler { get; set; }
  59. public ISynchronizingSubscriptionsFailedHandler SynchronizingSubscriptionsFailedHandler { get; set; }
  60. public async Task StartAsync(IManagedMqttClientOptions options)
  61. {
  62. ThrowIfDisposed();
  63. if (options == null) throw new ArgumentNullException(nameof(options));
  64. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  65. if (!options.ClientOptions.CleanSession)
  66. {
  67. throw new NotSupportedException("The managed client does not support existing sessions.");
  68. }
  69. if (!_maintainConnectionTask?.IsCompleted ?? false) throw new InvalidOperationException("The managed client is already started.");
  70. Options = options;
  71. if (Options.Storage != null)
  72. {
  73. _storageManager = new ManagedMqttClientStorageManager(Options.Storage);
  74. var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  75. foreach (var message in messages)
  76. {
  77. _messageQueue.Enqueue(message);
  78. }
  79. }
  80. _connectionCancellationToken = new CancellationTokenSource();
  81. _maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token);
  82. _maintainConnectionTask.Forget(_logger);
  83. _logger.Info("Started");
  84. }
  85. public async Task StopAsync()
  86. {
  87. ThrowIfDisposed();
  88. StopPublishing();
  89. StopMaintainingConnection();
  90. _messageQueue.Clear();
  91. if (_maintainConnectionTask != null)
  92. {
  93. await Task.WhenAny(_maintainConnectionTask);
  94. _maintainConnectionTask = null;
  95. }
  96. }
  97. public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
  98. {
  99. ThrowIfDisposed();
  100. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  101. await PublishAsync(new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage).Build()).ConfigureAwait(false);
  102. return new MqttClientPublishResult();
  103. }
  104. public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
  105. {
  106. ThrowIfDisposed();
  107. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  108. MqttTopicValidator.ThrowIfInvalid(applicationMessage.ApplicationMessage.Topic);
  109. ManagedMqttApplicationMessage removedMessage = null;
  110. ApplicationMessageSkippedEventArgs applicationMessageSkippedEventArgs = null;
  111. try
  112. {
  113. lock (_messageQueue)
  114. {
  115. if (_messageQueue.Count >= Options.MaxPendingMessages)
  116. {
  117. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
  118. {
  119. _logger.Verbose("Skipping publish of new application message because internal queue is full.");
  120. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(applicationMessage);
  121. return;
  122. }
  123. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
  124. {
  125. removedMessage = _messageQueue.RemoveFirst();
  126. _logger.Verbose("Removed oldest application message from internal queue because it is full.");
  127. applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(removedMessage);
  128. }
  129. }
  130. _messageQueue.Enqueue(applicationMessage);
  131. }
  132. }
  133. finally
  134. {
  135. if (applicationMessageSkippedEventArgs != null)
  136. {
  137. var applicationMessageSkippedHandler = ApplicationMessageSkippedHandler;
  138. if (applicationMessageSkippedHandler != null)
  139. {
  140. await applicationMessageSkippedHandler.HandleApplicationMessageSkippedAsync(applicationMessageSkippedEventArgs).ConfigureAwait(false);
  141. }
  142. }
  143. }
  144. if (_storageManager != null)
  145. {
  146. if (removedMessage != null)
  147. {
  148. await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
  149. }
  150. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  151. }
  152. }
  153. public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  154. {
  155. ThrowIfDisposed();
  156. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  157. lock (_subscriptions)
  158. {
  159. foreach (var topicFilter in topicFilters)
  160. {
  161. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  162. _subscriptionsNotPushed = true;
  163. }
  164. }
  165. return Task.FromResult(0);
  166. }
  167. public Task UnsubscribeAsync(IEnumerable<string> topics)
  168. {
  169. ThrowIfDisposed();
  170. if (topics == null) throw new ArgumentNullException(nameof(topics));
  171. lock (_subscriptions)
  172. {
  173. foreach (var topic in topics)
  174. {
  175. if (_subscriptions.Remove(topic))
  176. {
  177. _unsubscriptions.Add(topic);
  178. _subscriptionsNotPushed = true;
  179. }
  180. }
  181. }
  182. return Task.FromResult(0);
  183. }
  184. public void Dispose()
  185. {
  186. if (_disposed)
  187. {
  188. return;
  189. }
  190. _disposed = true;
  191. StopPublishing();
  192. StopMaintainingConnection();
  193. if (_maintainConnectionTask != null)
  194. {
  195. Task.WaitAny(_maintainConnectionTask);
  196. _maintainConnectionTask = null;
  197. }
  198. _mqttClient.Dispose();
  199. }
  200. private void ThrowIfDisposed()
  201. {
  202. if (_disposed)
  203. {
  204. throw new ObjectDisposedException(nameof(ManagedMqttClient));
  205. }
  206. }
  207. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  208. {
  209. try
  210. {
  211. while (!cancellationToken.IsCancellationRequested)
  212. {
  213. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  214. }
  215. }
  216. catch (OperationCanceledException)
  217. {
  218. }
  219. catch (Exception exception)
  220. {
  221. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  222. }
  223. finally
  224. {
  225. if (!_disposed)
  226. {
  227. try
  228. {
  229. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  230. }
  231. catch (Exception exception)
  232. {
  233. _logger.Error(exception, "Error while disconnecting.");
  234. }
  235. _logger.Info("Stopped");
  236. }
  237. }
  238. }
  239. private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  240. {
  241. try
  242. {
  243. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  244. if (connectionState == ReconnectionResult.NotConnected)
  245. {
  246. StopPublishing();
  247. await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  248. return;
  249. }
  250. if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
  251. {
  252. await SynchronizeSubscriptionsAsync().ConfigureAwait(false);
  253. StartPublishing();
  254. return;
  255. }
  256. if (connectionState == ReconnectionResult.StillConnected)
  257. {
  258. await Task.Delay(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
  259. }
  260. }
  261. catch (OperationCanceledException)
  262. {
  263. }
  264. catch (MqttCommunicationException exception)
  265. {
  266. _logger.Warning(exception, "Communication exception while maintaining connection.");
  267. }
  268. catch (Exception exception)
  269. {
  270. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  271. }
  272. }
  273. private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  274. {
  275. try
  276. {
  277. while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected)
  278. {
  279. // Peek at the message without dequeueing in order to prevent the
  280. // possibility of the queue growing beyond the configured cap.
  281. // Previously, messages could be re-enqueued if there was an
  282. // exception, and this re-enqueueing did not honor the cap.
  283. // Furthermore, because re-enqueueing would shuffle the order
  284. // of the messages, the DropOldestQueuedMessage strategy would
  285. // be unable to know which message is actually the oldest and would
  286. // instead drop the first item in the queue.
  287. var message = _messageQueue.PeekAndWait();
  288. if (message == null)
  289. {
  290. continue;
  291. }
  292. cancellationToken.ThrowIfCancellationRequested();
  293. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  294. }
  295. }
  296. catch (OperationCanceledException)
  297. {
  298. }
  299. catch (Exception exception)
  300. {
  301. _logger.Error(exception, "Unhandled exception while publishing queued application messages.");
  302. }
  303. finally
  304. {
  305. _logger.Verbose("Stopped publishing messages.");
  306. }
  307. }
  308. private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)
  309. {
  310. Exception transmitException = null;
  311. try
  312. {
  313. await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);
  314. lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
  315. {
  316. // While publishing this message, this.PublishAsync could have booted this
  317. // message off the queue to make room for another (when using a cap
  318. // with the DropOldestQueuedMessage strategy). If the first item
  319. // in the queue is equal to this message, then it's safe to remove
  320. // it from the queue. If not, that means this.PublishAsync has already
  321. // removed it, in which case we don't want to do anything.
  322. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  323. }
  324. if (_storageManager != null)
  325. {
  326. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  327. }
  328. }
  329. catch (MqttCommunicationException exception)
  330. {
  331. transmitException = exception;
  332. _logger.Warning(exception, $"Publishing application ({message.Id}) message failed.");
  333. if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  334. {
  335. //If QoS 0, we don't want this message to stay on the queue.
  336. //If QoS 1 or 2, it's possible that, when using a cap, this message
  337. //has been booted off the queue by this.PublishAsync, in which case this
  338. //thread will not continue to try to publish it. While this does
  339. //contradict the expected behavior of QoS 1 and 2, that's also true
  340. //for the usage of a message queue cap, so it's still consistent
  341. //with prior behavior in that way.
  342. lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
  343. {
  344. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  345. }
  346. if (_storageManager != null)
  347. {
  348. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  349. }
  350. }
  351. }
  352. catch (Exception exception)
  353. {
  354. transmitException = exception;
  355. _logger.Error(exception, $"Unhandled exception while publishing application message ({message.Id}).");
  356. }
  357. finally
  358. {
  359. var eventHandler = ApplicationMessageProcessedHandler;
  360. if (eventHandler != null)
  361. {
  362. var eventArguments = new ApplicationMessageProcessedEventArgs(message, transmitException);
  363. await eventHandler.HandleApplicationMessageProcessedAsync(eventArguments).ConfigureAwait(false);
  364. }
  365. }
  366. }
  367. private async Task SynchronizeSubscriptionsAsync()
  368. {
  369. _logger.Info("Synchronizing subscriptions");
  370. List<TopicFilter> subscriptions;
  371. HashSet<string> unsubscriptions;
  372. lock (_subscriptions)
  373. {
  374. subscriptions = _subscriptions.Select(i => new TopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value }).ToList();
  375. unsubscriptions = new HashSet<string>(_unsubscriptions);
  376. _unsubscriptions.Clear();
  377. _subscriptionsNotPushed = false;
  378. }
  379. if (!subscriptions.Any() && !unsubscriptions.Any())
  380. {
  381. return;
  382. }
  383. try
  384. {
  385. if (unsubscriptions.Any())
  386. {
  387. await _mqttClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
  388. }
  389. if (subscriptions.Any())
  390. {
  391. await _mqttClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
  392. }
  393. }
  394. catch (Exception exception)
  395. {
  396. _logger.Warning(exception, "Synchronizing subscriptions failed.");
  397. _subscriptionsNotPushed = true;
  398. var synchronizingSubscriptionsFailedHandler = SynchronizingSubscriptionsFailedHandler;
  399. if (SynchronizingSubscriptionsFailedHandler != null)
  400. {
  401. await synchronizingSubscriptionsFailedHandler.HandleSynchronizingSubscriptionsFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  402. }
  403. }
  404. }
  405. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  406. {
  407. if (_mqttClient.IsConnected)
  408. {
  409. return ReconnectionResult.StillConnected;
  410. }
  411. try
  412. {
  413. await _mqttClient.ConnectAsync(Options.ClientOptions).ConfigureAwait(false);
  414. return ReconnectionResult.Reconnected;
  415. }
  416. catch (Exception exception)
  417. {
  418. var connectingFailedHandler = ConnectingFailedHandler;
  419. if (connectingFailedHandler != null)
  420. {
  421. await connectingFailedHandler.HandleConnectingFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
  422. }
  423. return ReconnectionResult.NotConnected;
  424. }
  425. }
  426. private void StartPublishing()
  427. {
  428. if (_publishingCancellationToken != null)
  429. {
  430. StopPublishing();
  431. }
  432. var cts = new CancellationTokenSource();
  433. _publishingCancellationToken = cts;
  434. Task.Run(() => PublishQueuedMessagesAsync(cts.Token), cts.Token).Forget(_logger);
  435. }
  436. private void StopPublishing()
  437. {
  438. _publishingCancellationToken?.Cancel(false);
  439. _publishingCancellationToken?.Dispose();
  440. _publishingCancellationToken = null;
  441. }
  442. private void StopMaintainingConnection()
  443. {
  444. _connectionCancellationToken?.Cancel(false);
  445. _connectionCancellationToken?.Dispose();
  446. _connectionCancellationToken = null;
  447. }
  448. }
  449. }