You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

397 lines
14 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Client;
  8. using MQTTnet.Diagnostics;
  9. using MQTTnet.Exceptions;
  10. using MQTTnet.Protocol;
  11. namespace MQTTnet.ManagedClient
  12. {
  13. public class ManagedMqttClient : IManagedMqttClient
  14. {
  15. private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>();
  16. private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  17. private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1);
  18. private readonly List<string> _unsubscriptions = new List<string>();
  19. private readonly IMqttClient _mqttClient;
  20. private readonly IMqttNetLogger _logger;
  21. private CancellationTokenSource _connectionCancellationToken;
  22. private CancellationTokenSource _publishingCancellationToken;
  23. private ManagedMqttClientStorageManager _storageManager;
  24. private IManagedMqttClientOptions _options;
  25. private bool _subscriptionsNotPushed;
  26. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
  27. {
  28. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  29. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  30. _mqttClient.Connected += OnConnected;
  31. _mqttClient.Disconnected += OnDisconnected;
  32. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  33. }
  34. public bool IsConnected => _mqttClient.IsConnected;
  35. public bool IsStarted => _connectionCancellationToken != null;
  36. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  37. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  38. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  39. public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
  40. public event EventHandler SynchronizingSubscriptionsFailed;
  41. public async Task StartAsync(IManagedMqttClientOptions options)
  42. {
  43. if (options == null) throw new ArgumentNullException(nameof(options));
  44. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  45. if (!options.ClientOptions.CleanSession)
  46. {
  47. throw new NotSupportedException("The managed client does not support existing sessions.");
  48. }
  49. if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");
  50. _options = options;
  51. if (_options.Storage != null)
  52. {
  53. _storageManager = new ManagedMqttClientStorageManager(_options.Storage);
  54. await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  55. }
  56. _connectionCancellationToken = new CancellationTokenSource();
  57. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  58. Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token).ConfigureAwait(false), _connectionCancellationToken.Token).ConfigureAwait(false);
  59. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  60. _logger.Info<ManagedMqttClient>("Started");
  61. }
  62. public Task StopAsync()
  63. {
  64. StopPublishing();
  65. StopMaintainingConnection();
  66. while (_messageQueue.Any())
  67. {
  68. _messageQueue.Take();
  69. }
  70. return Task.FromResult(0);
  71. }
  72. public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
  73. {
  74. if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));
  75. foreach (var applicationMessage in applicationMessages)
  76. {
  77. if (_storageManager != null)
  78. {
  79. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  80. }
  81. _messageQueue.Add(applicationMessage);
  82. }
  83. }
  84. public async Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  85. {
  86. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  87. await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
  88. try
  89. {
  90. foreach (var topicFilter in topicFilters)
  91. {
  92. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  93. _subscriptionsNotPushed = true;
  94. }
  95. }
  96. finally
  97. {
  98. _subscriptionsSemaphore.Release();
  99. }
  100. }
  101. public async Task UnsubscribeAsync(IEnumerable<string> topics)
  102. {
  103. await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
  104. try
  105. {
  106. foreach (var topic in topics)
  107. {
  108. if (_subscriptions.Remove(topic))
  109. {
  110. _unsubscriptions.Add(topic);
  111. _subscriptionsNotPushed = true;
  112. }
  113. }
  114. }
  115. finally
  116. {
  117. _subscriptionsSemaphore.Release();
  118. }
  119. }
  120. public void Dispose()
  121. {
  122. _messageQueue?.Dispose();
  123. _subscriptionsSemaphore?.Dispose();
  124. _connectionCancellationToken?.Dispose();
  125. _publishingCancellationToken?.Dispose();
  126. }
  127. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  128. {
  129. try
  130. {
  131. while (!cancellationToken.IsCancellationRequested)
  132. {
  133. await TryMaintainConnectionAsync(cancellationToken).ConfigureAwait(false);
  134. }
  135. }
  136. catch (OperationCanceledException)
  137. {
  138. }
  139. catch (Exception exception)
  140. {
  141. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while maintaining connection.");
  142. }
  143. finally
  144. {
  145. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  146. _logger.Info<ManagedMqttClient>("Stopped");
  147. }
  148. }
  149. private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  150. {
  151. try
  152. {
  153. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  154. if (connectionState == ReconnectionResult.NotConnected)
  155. {
  156. StopPublishing();
  157. await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  158. return;
  159. }
  160. if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
  161. {
  162. await SynchronizeSubscriptionsAsync().ConfigureAwait(false);
  163. StartPublishing();
  164. return;
  165. }
  166. if (connectionState == ReconnectionResult.StillConnected)
  167. {
  168. await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
  169. }
  170. }
  171. catch (OperationCanceledException)
  172. {
  173. }
  174. catch (MqttCommunicationException exception)
  175. {
  176. _logger.Warning<ManagedMqttClient>(exception, "Communication exception while maintaining connection.");
  177. }
  178. catch (Exception exception)
  179. {
  180. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while maintaining connection.");
  181. }
  182. }
  183. private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  184. {
  185. try
  186. {
  187. while (!cancellationToken.IsCancellationRequested)
  188. {
  189. var message = _messageQueue.Take(cancellationToken);
  190. if (message == null)
  191. {
  192. continue;
  193. }
  194. if (cancellationToken.IsCancellationRequested)
  195. {
  196. continue;
  197. }
  198. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  199. }
  200. }
  201. catch (OperationCanceledException)
  202. {
  203. }
  204. catch (Exception exception)
  205. {
  206. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while publishing queued application messages.");
  207. }
  208. finally
  209. {
  210. _logger.Verbose<ManagedMqttClient>("Stopped publishing messages.");
  211. }
  212. }
  213. private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message)
  214. {
  215. Exception transmitException = null;
  216. try
  217. {
  218. await _mqttClient.PublishAsync(message).ConfigureAwait(false);
  219. if (_storageManager != null)
  220. {
  221. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  222. }
  223. }
  224. catch (MqttCommunicationException exception)
  225. {
  226. transmitException = exception;
  227. _logger.Warning<ManagedMqttClient>(exception, "Publishing application message failed.");
  228. if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  229. {
  230. _messageQueue.Add(message);
  231. }
  232. }
  233. catch (Exception exception)
  234. {
  235. transmitException = exception;
  236. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while publishing queued application message.");
  237. }
  238. finally
  239. {
  240. ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException));
  241. }
  242. }
  243. private async Task SynchronizeSubscriptionsAsync()
  244. {
  245. _logger.Info<ManagedMqttClient>(nameof(ManagedMqttClient), "Synchronizing subscriptions");
  246. List<TopicFilter> subscriptions;
  247. List<string> unsubscriptions;
  248. await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
  249. try
  250. {
  251. subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList();
  252. unsubscriptions = new List<string>(_unsubscriptions);
  253. _unsubscriptions.Clear();
  254. _subscriptionsNotPushed = false;
  255. }
  256. finally
  257. {
  258. _subscriptionsSemaphore.Release();
  259. }
  260. if (!subscriptions.Any() && !unsubscriptions.Any())
  261. {
  262. return;
  263. }
  264. try
  265. {
  266. if (subscriptions.Any())
  267. {
  268. await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
  269. }
  270. if (unsubscriptions.Any())
  271. {
  272. await _mqttClient.UnsubscribeAsync(unsubscriptions).ConfigureAwait(false);
  273. }
  274. }
  275. catch (Exception exception)
  276. {
  277. _logger.Warning<ManagedMqttClient>(exception, "Synchronizing subscriptions failed.");
  278. _subscriptionsNotPushed = true;
  279. SynchronizingSubscriptionsFailed?.Invoke(this, EventArgs.Empty);
  280. }
  281. }
  282. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  283. {
  284. if (_mqttClient.IsConnected)
  285. {
  286. return ReconnectionResult.StillConnected;
  287. }
  288. try
  289. {
  290. await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
  291. return ReconnectionResult.Reconnected;
  292. }
  293. catch (Exception)
  294. {
  295. return ReconnectionResult.NotConnected;
  296. }
  297. }
  298. private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  299. {
  300. ApplicationMessageReceived?.Invoke(this, eventArgs);
  301. }
  302. private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
  303. {
  304. Disconnected?.Invoke(this, eventArgs);
  305. }
  306. private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs)
  307. {
  308. Connected?.Invoke(this, eventArgs);
  309. }
  310. private void StartPublishing()
  311. {
  312. if (_publishingCancellationToken != null)
  313. {
  314. StopPublishing();
  315. }
  316. var cts = new CancellationTokenSource();
  317. _publishingCancellationToken = cts;
  318. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  319. Task.Run(async () => await PublishQueuedMessagesAsync(cts.Token).ConfigureAwait(false), cts.Token).ConfigureAwait(false);
  320. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  321. }
  322. private void StopPublishing()
  323. {
  324. _publishingCancellationToken?.Cancel(false);
  325. _publishingCancellationToken?.Dispose();
  326. _publishingCancellationToken = null;
  327. }
  328. private void StopMaintainingConnection()
  329. {
  330. _connectionCancellationToken?.Cancel(false);
  331. _connectionCancellationToken?.Dispose();
  332. _connectionCancellationToken = null;
  333. }
  334. }
  335. }