You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

454 lines
18 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Client;
  7. using MQTTnet.Client.Publishing;
  8. using MQTTnet.Diagnostics;
  9. using MQTTnet.Exceptions;
  10. using MQTTnet.Internal;
  11. using MQTTnet.Protocol;
  12. using MQTTnet.Server;
  13. using MqttClientConnectedEventArgs = MQTTnet.Client.Connecting.MqttClientConnectedEventArgs;
  14. using MqttClientDisconnectedEventArgs = MQTTnet.Client.Disconnecting.MqttClientDisconnectedEventArgs;
  15. namespace MQTTnet.Extensions.ManagedClient
  16. {
  17. public class ManagedMqttClient : IManagedMqttClient
  18. {
  19. private readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();
  20. private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  21. private readonly HashSet<string> _unsubscriptions = new HashSet<string>();
  22. private readonly IMqttClient _mqttClient;
  23. private readonly IMqttNetChildLogger _logger;
  24. private CancellationTokenSource _connectionCancellationToken;
  25. private CancellationTokenSource _publishingCancellationToken;
  26. private ManagedMqttClientStorageManager _storageManager;
  27. private bool _subscriptionsNotPushed;
  28. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
  29. {
  30. if (logger == null) throw new ArgumentNullException(nameof(logger));
  31. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  32. _mqttClient.Connected += OnConnected;
  33. _mqttClient.Disconnected += OnDisconnected;
  34. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  35. _logger = logger.CreateChildLogger(nameof(ManagedMqttClient));
  36. }
  37. public bool IsConnected => _mqttClient.IsConnected;
  38. public bool IsStarted => _connectionCancellationToken != null;
  39. public int PendingApplicationMessagesCount => _messageQueue.Count;
  40. public IManagedMqttClientOptions Options { get; private set; }
  41. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  42. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  43. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  44. public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
  45. public event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped;
  46. public event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed;
  47. public event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed;
  48. public async Task StartAsync(IManagedMqttClientOptions options)
  49. {
  50. if (options == null) throw new ArgumentNullException(nameof(options));
  51. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  52. if (!options.ClientOptions.CleanSession)
  53. {
  54. throw new NotSupportedException("The managed client does not support existing sessions.");
  55. }
  56. if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");
  57. Options = options;
  58. if (Options.Storage != null)
  59. {
  60. _storageManager = new ManagedMqttClientStorageManager(Options.Storage);
  61. var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  62. foreach (var message in messages)
  63. {
  64. _messageQueue.Enqueue(message);
  65. }
  66. }
  67. _connectionCancellationToken = new CancellationTokenSource();
  68. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  69. Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token);
  70. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  71. _logger.Info("Started");
  72. }
  73. public Task StopAsync()
  74. {
  75. StopPublishing();
  76. StopMaintainingConnection();
  77. _messageQueue.Clear();
  78. return Task.FromResult(0);
  79. }
  80. public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage)
  81. {
  82. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  83. await PublishAsync(new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage).Build()).ConfigureAwait(false);
  84. return new MqttClientPublishResult();
  85. }
  86. public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
  87. {
  88. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  89. ManagedMqttApplicationMessage removedMessage = null;
  90. lock (_messageQueue)
  91. {
  92. if (_messageQueue.Count >= Options.MaxPendingMessages)
  93. {
  94. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
  95. {
  96. _logger.Verbose("Skipping publish of new application message because internal queue is full.");
  97. ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(applicationMessage));
  98. return;
  99. }
  100. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
  101. {
  102. removedMessage = _messageQueue.RemoveFirst();
  103. _logger.Verbose("Removed oldest application message from internal queue because it is full.");
  104. ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(removedMessage));
  105. }
  106. }
  107. _messageQueue.Enqueue(applicationMessage);
  108. }
  109. if (_storageManager != null)
  110. {
  111. if (removedMessage != null)
  112. {
  113. await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
  114. }
  115. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  116. }
  117. }
  118. public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  119. {
  120. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  121. lock (_subscriptions)
  122. {
  123. foreach (var topicFilter in topicFilters)
  124. {
  125. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  126. _subscriptionsNotPushed = true;
  127. }
  128. }
  129. return Task.FromResult(0);
  130. }
  131. public Task UnsubscribeAsync(IEnumerable<string> topics)
  132. {
  133. if (topics == null) throw new ArgumentNullException(nameof(topics));
  134. lock (_subscriptions)
  135. {
  136. foreach (var topic in topics)
  137. {
  138. if (_subscriptions.Remove(topic))
  139. {
  140. _unsubscriptions.Add(topic);
  141. _subscriptionsNotPushed = true;
  142. }
  143. }
  144. }
  145. return Task.FromResult(0);
  146. }
  147. public void Dispose()
  148. {
  149. _connectionCancellationToken?.Dispose();
  150. _publishingCancellationToken?.Dispose();
  151. }
  152. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  153. {
  154. try
  155. {
  156. while (!cancellationToken.IsCancellationRequested)
  157. {
  158. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  159. }
  160. }
  161. catch (OperationCanceledException)
  162. {
  163. }
  164. catch (Exception exception)
  165. {
  166. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  167. }
  168. finally
  169. {
  170. try
  171. {
  172. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  173. }
  174. catch (Exception exception)
  175. {
  176. _logger.Error(exception, "Error while disconnecting.");
  177. }
  178. _logger.Info("Stopped");
  179. }
  180. }
  181. private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  182. {
  183. try
  184. {
  185. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  186. if (connectionState == ReconnectionResult.NotConnected)
  187. {
  188. StopPublishing();
  189. await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  190. return;
  191. }
  192. if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
  193. {
  194. await SynchronizeSubscriptionsAsync().ConfigureAwait(false);
  195. StartPublishing();
  196. return;
  197. }
  198. if (connectionState == ReconnectionResult.StillConnected)
  199. {
  200. await Task.Delay(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
  201. }
  202. }
  203. catch (OperationCanceledException)
  204. {
  205. }
  206. catch (MqttCommunicationException exception)
  207. {
  208. _logger.Warning(exception, "Communication exception while maintaining connection.");
  209. }
  210. catch (Exception exception)
  211. {
  212. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  213. }
  214. }
  215. private void PublishQueuedMessages(CancellationToken cancellationToken)
  216. {
  217. try
  218. {
  219. while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected)
  220. {
  221. //Peek at the message without dequeueing in order to prevent the
  222. //possibility of the queue growing beyond the configured cap.
  223. //Previously, messages could be re-enqueued if there was an
  224. //exception, and this re-enqueueing did not honor the cap.
  225. //Furthermore, because re-enqueueing would shuffle the order
  226. //of the messages, the DropOldestQueuedMessage strategy would
  227. //be unable to know which message is actually the oldest and would
  228. //instead drop the first item in the queue.
  229. var message = _messageQueue.PeekAndWait();
  230. if (message == null)
  231. {
  232. continue;
  233. }
  234. cancellationToken.ThrowIfCancellationRequested();
  235. TryPublishQueuedMessage(message);
  236. }
  237. }
  238. catch (OperationCanceledException)
  239. {
  240. }
  241. catch (Exception exception)
  242. {
  243. _logger.Error(exception, "Unhandled exception while publishing queued application messages.");
  244. }
  245. finally
  246. {
  247. _logger.Verbose("Stopped publishing messages.");
  248. }
  249. }
  250. private void TryPublishQueuedMessage(ManagedMqttApplicationMessage message)
  251. {
  252. Exception transmitException = null;
  253. try
  254. {
  255. _mqttClient.PublishAsync(message.ApplicationMessage).GetAwaiter().GetResult();
  256. lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
  257. {
  258. //While publishing this message, this.PublishAsync could have booted this
  259. //message off the queue to make room for another (when using a cap
  260. //with the DropOldestQueuedMessage strategy). If the first item
  261. //in the queue is equal to this message, then it's safe to remove
  262. //it from the queue. If not, that means this.PublishAsync has already
  263. //removed it, in which case we don't want to do anything.
  264. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  265. }
  266. _storageManager?.RemoveAsync(message).GetAwaiter().GetResult();
  267. }
  268. catch (MqttCommunicationException exception)
  269. {
  270. transmitException = exception;
  271. _logger.Warning(exception, $"Publishing application ({message.Id}) message failed.");
  272. if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  273. {
  274. //If QoS 0, we don't want this message to stay on the queue.
  275. //If QoS 1 or 2, it's possible that, when using a cap, this message
  276. //has been booted off the queue by this.PublishAsync, in which case this
  277. //thread will not continue to try to publish it. While this does
  278. //contradict the expected behavior of QoS 1 and 2, that's also true
  279. //for the usage of a message queue cap, so it's still consistent
  280. //with prior behavior in that way.
  281. lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
  282. {
  283. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  284. }
  285. }
  286. }
  287. catch (Exception exception)
  288. {
  289. transmitException = exception;
  290. _logger.Error(exception, $"Unhandled exception while publishing application message ({message.Id}).");
  291. }
  292. finally
  293. {
  294. ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException));
  295. }
  296. }
  297. private async Task SynchronizeSubscriptionsAsync()
  298. {
  299. _logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions");
  300. List<TopicFilter> subscriptions;
  301. HashSet<string> unsubscriptions;
  302. lock (_subscriptions)
  303. {
  304. subscriptions = _subscriptions.Select(i => new TopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value }).ToList();
  305. unsubscriptions = new HashSet<string>(_unsubscriptions);
  306. _unsubscriptions.Clear();
  307. _subscriptionsNotPushed = false;
  308. }
  309. if (!subscriptions.Any() && !unsubscriptions.Any())
  310. {
  311. return;
  312. }
  313. try
  314. {
  315. if (unsubscriptions.Any())
  316. {
  317. await _mqttClient.UnsubscribeAsync(unsubscriptions).ConfigureAwait(false);
  318. }
  319. if (subscriptions.Any())
  320. {
  321. await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
  322. }
  323. }
  324. catch (Exception exception)
  325. {
  326. _logger.Warning(exception, "Synchronizing subscriptions failed.");
  327. _subscriptionsNotPushed = true;
  328. SynchronizingSubscriptionsFailed?.Invoke(this, new MqttManagedProcessFailedEventArgs(exception));
  329. }
  330. }
  331. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  332. {
  333. if (_mqttClient.IsConnected)
  334. {
  335. return ReconnectionResult.StillConnected;
  336. }
  337. try
  338. {
  339. await _mqttClient.ConnectAsync(Options.ClientOptions).ConfigureAwait(false);
  340. return ReconnectionResult.Reconnected;
  341. }
  342. catch (Exception exception)
  343. {
  344. ConnectingFailed?.Invoke(this, new MqttManagedProcessFailedEventArgs(exception));
  345. return ReconnectionResult.NotConnected;
  346. }
  347. }
  348. private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  349. {
  350. ApplicationMessageReceived?.Invoke(this, eventArgs);
  351. }
  352. private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
  353. {
  354. Disconnected?.Invoke(this, eventArgs);
  355. }
  356. private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs)
  357. {
  358. Connected?.Invoke(this, eventArgs);
  359. }
  360. private void StartPublishing()
  361. {
  362. if (_publishingCancellationToken != null)
  363. {
  364. StopPublishing();
  365. }
  366. var cts = new CancellationTokenSource();
  367. _publishingCancellationToken = cts;
  368. Task.Factory.StartNew(() => PublishQueuedMessages(cts.Token), cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
  369. }
  370. private void StopPublishing()
  371. {
  372. _publishingCancellationToken?.Cancel(false);
  373. _publishingCancellationToken?.Dispose();
  374. _publishingCancellationToken = null;
  375. }
  376. private void StopMaintainingConnection()
  377. {
  378. _connectionCancellationToken?.Cancel(false);
  379. _connectionCancellationToken?.Dispose();
  380. _connectionCancellationToken = null;
  381. }
  382. }
  383. }