You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

470 line
18 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Client;
  7. using MQTTnet.Client.Connecting;
  8. using MQTTnet.Client.Disconnecting;
  9. using MQTTnet.Client.Publishing;
  10. using MQTTnet.Client.Receiving;
  11. using MQTTnet.Diagnostics;
  12. using MQTTnet.Exceptions;
  13. using MQTTnet.Internal;
  14. using MQTTnet.Protocol;
  15. using MQTTnet.Server;
  16. using MqttClientConnectedEventArgs = MQTTnet.Client.Connecting.MqttClientConnectedEventArgs;
  17. using MqttClientDisconnectedEventArgs = MQTTnet.Client.Disconnecting.MqttClientDisconnectedEventArgs;
  18. namespace MQTTnet.Extensions.ManagedClient
  19. {
  20. public class ManagedMqttClient : IManagedMqttClient
  21. {
  22. private readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();
  23. private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  24. private readonly HashSet<string> _unsubscriptions = new HashSet<string>();
  25. private readonly IMqttClient _mqttClient;
  26. private readonly IMqttNetChildLogger _logger;
  27. private CancellationTokenSource _connectionCancellationToken;
  28. private CancellationTokenSource _publishingCancellationToken;
  29. private ManagedMqttClientStorageManager _storageManager;
  30. private bool _subscriptionsNotPushed;
  31. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
  32. {
  33. if (logger == null) throw new ArgumentNullException(nameof(logger));
  34. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  35. _mqttClient.UseConnectedHandler(OnConnected);
  36. _mqttClient.UseDisconnectedHandler(OnDisconnected);
  37. _mqttClient.UseReceivedApplicationMessageHandler(OnApplicationMessageReceived);
  38. _logger = logger.CreateChildLogger(nameof(ManagedMqttClient));
  39. }
  40. public bool IsConnected => _mqttClient.IsConnected;
  41. public bool IsStarted => _connectionCancellationToken != null;
  42. public int PendingApplicationMessagesCount => _messageQueue.Count;
  43. public IManagedMqttClientOptions Options { get; private set; }
  44. public IMqttClientConnectedHandler ConnectedHandler { get; set; }
  45. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  46. public IMqttClientDisconnectedHandler DisconnectedHandler { get; set; }
  47. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  48. public IMqttApplicationMessageHandler ReceivedApplicationMessageHandler
  49. {
  50. get => _mqttClient.ReceivedApplicationMessageHandler;
  51. set => _mqttClient.ReceivedApplicationMessageHandler = value;
  52. }
  53. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  54. public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
  55. public event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped;
  56. public event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed;
  57. public event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed;
  58. public async Task StartAsync(IManagedMqttClientOptions options)
  59. {
  60. if (options == null) throw new ArgumentNullException(nameof(options));
  61. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  62. if (!options.ClientOptions.CleanSession)
  63. {
  64. throw new NotSupportedException("The managed client does not support existing sessions.");
  65. }
  66. if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");
  67. Options = options;
  68. if (Options.Storage != null)
  69. {
  70. _storageManager = new ManagedMqttClientStorageManager(Options.Storage);
  71. var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  72. foreach (var message in messages)
  73. {
  74. _messageQueue.Enqueue(message);
  75. }
  76. }
  77. _connectionCancellationToken = new CancellationTokenSource();
  78. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  79. Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token);
  80. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  81. _logger.Info("Started");
  82. }
  83. public Task StopAsync()
  84. {
  85. StopPublishing();
  86. StopMaintainingConnection();
  87. _messageQueue.Clear();
  88. return Task.FromResult(0);
  89. }
  90. public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
  91. {
  92. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  93. await PublishAsync(new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(applicationMessage).Build()).ConfigureAwait(false);
  94. return new MqttClientPublishResult();
  95. }
  96. public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
  97. {
  98. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  99. ManagedMqttApplicationMessage removedMessage = null;
  100. lock (_messageQueue)
  101. {
  102. if (_messageQueue.Count >= Options.MaxPendingMessages)
  103. {
  104. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
  105. {
  106. _logger.Verbose("Skipping publish of new application message because internal queue is full.");
  107. ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(applicationMessage));
  108. return;
  109. }
  110. if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
  111. {
  112. removedMessage = _messageQueue.RemoveFirst();
  113. _logger.Verbose("Removed oldest application message from internal queue because it is full.");
  114. ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(removedMessage));
  115. }
  116. }
  117. _messageQueue.Enqueue(applicationMessage);
  118. }
  119. if (_storageManager != null)
  120. {
  121. if (removedMessage != null)
  122. {
  123. await _storageManager.RemoveAsync(removedMessage).ConfigureAwait(false);
  124. }
  125. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  126. }
  127. }
  128. public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  129. {
  130. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  131. lock (_subscriptions)
  132. {
  133. foreach (var topicFilter in topicFilters)
  134. {
  135. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  136. _subscriptionsNotPushed = true;
  137. }
  138. }
  139. return Task.FromResult(0);
  140. }
  141. public Task UnsubscribeAsync(IEnumerable<string> topics)
  142. {
  143. if (topics == null) throw new ArgumentNullException(nameof(topics));
  144. lock (_subscriptions)
  145. {
  146. foreach (var topic in topics)
  147. {
  148. if (_subscriptions.Remove(topic))
  149. {
  150. _unsubscriptions.Add(topic);
  151. _subscriptionsNotPushed = true;
  152. }
  153. }
  154. }
  155. return Task.FromResult(0);
  156. }
  157. public void Dispose()
  158. {
  159. _connectionCancellationToken?.Dispose();
  160. _publishingCancellationToken?.Dispose();
  161. }
  162. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  163. {
  164. try
  165. {
  166. while (!cancellationToken.IsCancellationRequested)
  167. {
  168. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  169. }
  170. }
  171. catch (OperationCanceledException)
  172. {
  173. }
  174. catch (Exception exception)
  175. {
  176. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  177. }
  178. finally
  179. {
  180. try
  181. {
  182. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  183. }
  184. catch (Exception exception)
  185. {
  186. _logger.Error(exception, "Error while disconnecting.");
  187. }
  188. _logger.Info("Stopped");
  189. }
  190. }
  191. private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  192. {
  193. try
  194. {
  195. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  196. if (connectionState == ReconnectionResult.NotConnected)
  197. {
  198. StopPublishing();
  199. await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  200. return;
  201. }
  202. if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
  203. {
  204. await SynchronizeSubscriptionsAsync().ConfigureAwait(false);
  205. StartPublishing();
  206. return;
  207. }
  208. if (connectionState == ReconnectionResult.StillConnected)
  209. {
  210. await Task.Delay(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
  211. }
  212. }
  213. catch (OperationCanceledException)
  214. {
  215. }
  216. catch (MqttCommunicationException exception)
  217. {
  218. _logger.Warning(exception, "Communication exception while maintaining connection.");
  219. }
  220. catch (Exception exception)
  221. {
  222. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  223. }
  224. }
  225. private void PublishQueuedMessages(CancellationToken cancellationToken)
  226. {
  227. try
  228. {
  229. while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected)
  230. {
  231. //Peek at the message without dequeueing in order to prevent the
  232. //possibility of the queue growing beyond the configured cap.
  233. //Previously, messages could be re-enqueued if there was an
  234. //exception, and this re-enqueueing did not honor the cap.
  235. //Furthermore, because re-enqueueing would shuffle the order
  236. //of the messages, the DropOldestQueuedMessage strategy would
  237. //be unable to know which message is actually the oldest and would
  238. //instead drop the first item in the queue.
  239. var message = _messageQueue.PeekAndWait();
  240. if (message == null)
  241. {
  242. continue;
  243. }
  244. cancellationToken.ThrowIfCancellationRequested();
  245. TryPublishQueuedMessage(message);
  246. }
  247. }
  248. catch (OperationCanceledException)
  249. {
  250. }
  251. catch (Exception exception)
  252. {
  253. _logger.Error(exception, "Unhandled exception while publishing queued application messages.");
  254. }
  255. finally
  256. {
  257. _logger.Verbose("Stopped publishing messages.");
  258. }
  259. }
  260. private void TryPublishQueuedMessage(ManagedMqttApplicationMessage message)
  261. {
  262. Exception transmitException = null;
  263. try
  264. {
  265. _mqttClient.PublishAsync(message.ApplicationMessage).GetAwaiter().GetResult();
  266. lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
  267. {
  268. //While publishing this message, this.PublishAsync could have booted this
  269. //message off the queue to make room for another (when using a cap
  270. //with the DropOldestQueuedMessage strategy). If the first item
  271. //in the queue is equal to this message, then it's safe to remove
  272. //it from the queue. If not, that means this.PublishAsync has already
  273. //removed it, in which case we don't want to do anything.
  274. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  275. }
  276. _storageManager?.RemoveAsync(message).GetAwaiter().GetResult();
  277. }
  278. catch (MqttCommunicationException exception)
  279. {
  280. transmitException = exception;
  281. _logger.Warning(exception, $"Publishing application ({message.Id}) message failed.");
  282. if (message.ApplicationMessage.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  283. {
  284. //If QoS 0, we don't want this message to stay on the queue.
  285. //If QoS 1 or 2, it's possible that, when using a cap, this message
  286. //has been booted off the queue by this.PublishAsync, in which case this
  287. //thread will not continue to try to publish it. While this does
  288. //contradict the expected behavior of QoS 1 and 2, that's also true
  289. //for the usage of a message queue cap, so it's still consistent
  290. //with prior behavior in that way.
  291. lock (_messageQueue) //lock to avoid conflict with this.PublishAsync
  292. {
  293. _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id));
  294. }
  295. }
  296. }
  297. catch (Exception exception)
  298. {
  299. transmitException = exception;
  300. _logger.Error(exception, $"Unhandled exception while publishing application message ({message.Id}).");
  301. }
  302. finally
  303. {
  304. ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException));
  305. }
  306. }
  307. private async Task SynchronizeSubscriptionsAsync()
  308. {
  309. _logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions");
  310. List<TopicFilter> subscriptions;
  311. HashSet<string> unsubscriptions;
  312. lock (_subscriptions)
  313. {
  314. subscriptions = _subscriptions.Select(i => new TopicFilter { Topic = i.Key, QualityOfServiceLevel = i.Value }).ToList();
  315. unsubscriptions = new HashSet<string>(_unsubscriptions);
  316. _unsubscriptions.Clear();
  317. _subscriptionsNotPushed = false;
  318. }
  319. if (!subscriptions.Any() && !unsubscriptions.Any())
  320. {
  321. return;
  322. }
  323. try
  324. {
  325. if (unsubscriptions.Any())
  326. {
  327. await _mqttClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
  328. }
  329. if (subscriptions.Any())
  330. {
  331. await _mqttClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
  332. }
  333. }
  334. catch (Exception exception)
  335. {
  336. _logger.Warning(exception, "Synchronizing subscriptions failed.");
  337. _subscriptionsNotPushed = true;
  338. SynchronizingSubscriptionsFailed?.Invoke(this, new MqttManagedProcessFailedEventArgs(exception));
  339. }
  340. }
  341. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  342. {
  343. if (_mqttClient.IsConnected)
  344. {
  345. return ReconnectionResult.StillConnected;
  346. }
  347. try
  348. {
  349. await _mqttClient.ConnectAsync(Options.ClientOptions).ConfigureAwait(false);
  350. return ReconnectionResult.Reconnected;
  351. }
  352. catch (Exception exception)
  353. {
  354. ConnectingFailed?.Invoke(this, new MqttManagedProcessFailedEventArgs(exception));
  355. return ReconnectionResult.NotConnected;
  356. }
  357. }
  358. private Task OnApplicationMessageReceived(MqttApplicationMessageHandlerContext context)
  359. {
  360. ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(context.SenderClientId, context.ApplicationMessage));
  361. return Task.FromResult(0);
  362. }
  363. private Task OnDisconnected(MqttClientDisconnectedEventArgs eventArgs)
  364. {
  365. Disconnected?.Invoke(this, eventArgs);
  366. return DisconnectedHandler?.HandleDisconnectedAsync(eventArgs);
  367. }
  368. private Task OnConnected(MqttClientConnectedEventArgs eventArgs)
  369. {
  370. Connected?.Invoke(this, eventArgs);
  371. return ConnectedHandler?.HandleConnectedAsync(eventArgs);
  372. }
  373. private void StartPublishing()
  374. {
  375. if (_publishingCancellationToken != null)
  376. {
  377. StopPublishing();
  378. }
  379. var cts = new CancellationTokenSource();
  380. _publishingCancellationToken = cts;
  381. Task.Factory.StartNew(() => PublishQueuedMessages(cts.Token), cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
  382. }
  383. private void StopPublishing()
  384. {
  385. _publishingCancellationToken?.Cancel(false);
  386. _publishingCancellationToken?.Dispose();
  387. _publishingCancellationToken = null;
  388. }
  389. private void StopMaintainingConnection()
  390. {
  391. _connectionCancellationToken?.Cancel(false);
  392. _connectionCancellationToken?.Dispose();
  393. _connectionCancellationToken = null;
  394. }
  395. }
  396. }