You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

388 lines
14 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Client;
  8. using MQTTnet.Diagnostics;
  9. using MQTTnet.Exceptions;
  10. using MQTTnet.Internal;
  11. using MQTTnet.Protocol;
  12. namespace MQTTnet.ManagedClient
  13. {
  14. public class ManagedMqttClient : IManagedMqttClient
  15. {
  16. private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>();
  17. private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  18. private readonly AsyncLock _subscriptionsLock = new AsyncLock();
  19. private readonly List<string> _unsubscriptions = new List<string>();
  20. private readonly IMqttClient _mqttClient;
  21. private readonly IMqttNetChildLogger _logger;
  22. private CancellationTokenSource _connectionCancellationToken;
  23. private CancellationTokenSource _publishingCancellationToken;
  24. private ManagedMqttClientStorageManager _storageManager;
  25. private IManagedMqttClientOptions _options;
  26. private bool _subscriptionsNotPushed;
  27. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
  28. {
  29. if (logger == null) throw new ArgumentNullException(nameof(logger));
  30. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  31. _mqttClient.Connected += OnConnected;
  32. _mqttClient.Disconnected += OnDisconnected;
  33. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  34. _logger = logger.CreateChildLogger(nameof(ManagedMqttClient));
  35. }
  36. public bool IsConnected => _mqttClient.IsConnected;
  37. public bool IsStarted => _connectionCancellationToken != null;
  38. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  39. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  40. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  41. public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
  42. public event EventHandler SynchronizingSubscriptionsFailed;
  43. public async Task StartAsync(IManagedMqttClientOptions options)
  44. {
  45. if (options == null) throw new ArgumentNullException(nameof(options));
  46. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  47. if (!options.ClientOptions.CleanSession)
  48. {
  49. throw new NotSupportedException("The managed client does not support existing sessions.");
  50. }
  51. if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");
  52. _options = options;
  53. if (_options.Storage != null)
  54. {
  55. _storageManager = new ManagedMqttClientStorageManager(_options.Storage);
  56. var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  57. foreach (var message in messages)
  58. {
  59. _messageQueue.Add(message);
  60. }
  61. }
  62. _connectionCancellationToken = new CancellationTokenSource();
  63. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  64. Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token);
  65. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  66. _logger.Info("Started");
  67. }
  68. public Task StopAsync()
  69. {
  70. StopPublishing();
  71. StopMaintainingConnection();
  72. while (_messageQueue.Any())
  73. {
  74. _messageQueue.Take();
  75. }
  76. return Task.FromResult(0);
  77. }
  78. public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
  79. {
  80. if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));
  81. foreach (var applicationMessage in applicationMessages)
  82. {
  83. if (_storageManager != null)
  84. {
  85. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  86. }
  87. _messageQueue.Add(applicationMessage);
  88. }
  89. }
  90. public async Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  91. {
  92. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  93. using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false))
  94. {
  95. foreach (var topicFilter in topicFilters)
  96. {
  97. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  98. _subscriptionsNotPushed = true;
  99. }
  100. }
  101. }
  102. public async Task UnsubscribeAsync(IEnumerable<string> topics)
  103. {
  104. using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false))
  105. {
  106. foreach (var topic in topics)
  107. {
  108. if (_subscriptions.Remove(topic))
  109. {
  110. _unsubscriptions.Add(topic);
  111. _subscriptionsNotPushed = true;
  112. }
  113. }
  114. }
  115. }
  116. public void Dispose()
  117. {
  118. _messageQueue?.Dispose();
  119. _subscriptionsLock?.Dispose();
  120. _connectionCancellationToken?.Dispose();
  121. _publishingCancellationToken?.Dispose();
  122. }
  123. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  124. {
  125. try
  126. {
  127. while (!cancellationToken.IsCancellationRequested)
  128. {
  129. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  130. }
  131. }
  132. catch (OperationCanceledException)
  133. {
  134. }
  135. catch (Exception exception)
  136. {
  137. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  138. }
  139. finally
  140. {
  141. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  142. _logger.Info("Stopped");
  143. }
  144. }
  145. private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  146. {
  147. try
  148. {
  149. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  150. if (connectionState == ReconnectionResult.NotConnected)
  151. {
  152. StopPublishing();
  153. await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  154. return;
  155. }
  156. if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
  157. {
  158. await SynchronizeSubscriptionsAsync().ConfigureAwait(false);
  159. StartPublishing();
  160. return;
  161. }
  162. if (connectionState == ReconnectionResult.StillConnected)
  163. {
  164. await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
  165. }
  166. }
  167. catch (OperationCanceledException)
  168. {
  169. }
  170. catch (MqttCommunicationException exception)
  171. {
  172. _logger.Warning(exception, "Communication exception while maintaining connection.");
  173. }
  174. catch (Exception exception)
  175. {
  176. _logger.Error(exception, "Unhandled exception while maintaining connection.");
  177. }
  178. }
  179. private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  180. {
  181. try
  182. {
  183. while (!cancellationToken.IsCancellationRequested)
  184. {
  185. var message = _messageQueue.Take(cancellationToken);
  186. if (message == null)
  187. {
  188. continue;
  189. }
  190. if (cancellationToken.IsCancellationRequested)
  191. {
  192. continue;
  193. }
  194. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  195. }
  196. }
  197. catch (OperationCanceledException)
  198. {
  199. }
  200. catch (Exception exception)
  201. {
  202. _logger.Error(exception, "Unhandled exception while publishing queued application messages.");
  203. }
  204. finally
  205. {
  206. _logger.Verbose("Stopped publishing messages.");
  207. }
  208. }
  209. private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message)
  210. {
  211. Exception transmitException = null;
  212. try
  213. {
  214. await _mqttClient.PublishAsync(message).ConfigureAwait(false);
  215. if (_storageManager != null)
  216. {
  217. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  218. }
  219. }
  220. catch (MqttCommunicationException exception)
  221. {
  222. transmitException = exception;
  223. _logger.Warning(exception, "Publishing application message failed.");
  224. if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  225. {
  226. _messageQueue.Add(message);
  227. }
  228. }
  229. catch (Exception exception)
  230. {
  231. transmitException = exception;
  232. _logger.Error(exception, "Unhandled exception while publishing queued application message.");
  233. }
  234. finally
  235. {
  236. ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException));
  237. }
  238. }
  239. private async Task SynchronizeSubscriptionsAsync()
  240. {
  241. _logger.Info(nameof(ManagedMqttClient), "Synchronizing subscriptions");
  242. List<TopicFilter> subscriptions;
  243. List<string> unsubscriptions;
  244. using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false))
  245. {
  246. subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList();
  247. unsubscriptions = new List<string>(_unsubscriptions);
  248. _unsubscriptions.Clear();
  249. _subscriptionsNotPushed = false;
  250. }
  251. if (!subscriptions.Any() && !unsubscriptions.Any())
  252. {
  253. return;
  254. }
  255. try
  256. {
  257. if (subscriptions.Any())
  258. {
  259. await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
  260. }
  261. if (unsubscriptions.Any())
  262. {
  263. await _mqttClient.UnsubscribeAsync(unsubscriptions).ConfigureAwait(false);
  264. }
  265. }
  266. catch (Exception exception)
  267. {
  268. _logger.Warning(exception, "Synchronizing subscriptions failed.");
  269. _subscriptionsNotPushed = true;
  270. SynchronizingSubscriptionsFailed?.Invoke(this, EventArgs.Empty);
  271. }
  272. }
  273. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  274. {
  275. if (_mqttClient.IsConnected)
  276. {
  277. return ReconnectionResult.StillConnected;
  278. }
  279. try
  280. {
  281. await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
  282. return ReconnectionResult.Reconnected;
  283. }
  284. catch (Exception)
  285. {
  286. return ReconnectionResult.NotConnected;
  287. }
  288. }
  289. private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  290. {
  291. ApplicationMessageReceived?.Invoke(this, eventArgs);
  292. }
  293. private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
  294. {
  295. Disconnected?.Invoke(this, eventArgs);
  296. }
  297. private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs)
  298. {
  299. Connected?.Invoke(this, eventArgs);
  300. }
  301. private void StartPublishing()
  302. {
  303. if (_publishingCancellationToken != null)
  304. {
  305. StopPublishing();
  306. }
  307. var cts = new CancellationTokenSource();
  308. _publishingCancellationToken = cts;
  309. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  310. Task.Run(() => PublishQueuedMessagesAsync(cts.Token), cts.Token);
  311. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  312. }
  313. private void StopPublishing()
  314. {
  315. _publishingCancellationToken?.Cancel(false);
  316. _publishingCancellationToken?.Dispose();
  317. _publishingCancellationToken = null;
  318. }
  319. private void StopMaintainingConnection()
  320. {
  321. _connectionCancellationToken?.Cancel(false);
  322. _connectionCancellationToken?.Dispose();
  323. _connectionCancellationToken = null;
  324. }
  325. }
  326. }