You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

380 lines
13 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Client;
  8. using MQTTnet.Diagnostics;
  9. using MQTTnet.Exceptions;
  10. using MQTTnet.Protocol;
  11. namespace MQTTnet.ManagedClient
  12. {
  13. public class ManagedMqttClient : IManagedMqttClient
  14. {
  15. private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>();
  16. private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
  17. private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1);
  18. private readonly List<string> _unsubscriptions = new List<string>();
  19. private readonly IMqttClient _mqttClient;
  20. private readonly IMqttNetLogger _logger;
  21. private CancellationTokenSource _connectionCancellationToken;
  22. private CancellationTokenSource _publishingCancellationToken;
  23. private ManagedMqttClientStorageManager _storageManager;
  24. private IManagedMqttClientOptions _options;
  25. private bool _subscriptionsNotPushed;
  26. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
  27. {
  28. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  29. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  30. _mqttClient.Connected += OnConnected;
  31. _mqttClient.Disconnected += OnDisconnected;
  32. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  33. }
  34. public bool IsConnected => _mqttClient.IsConnected;
  35. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  36. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  37. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  38. public async Task StartAsync(IManagedMqttClientOptions options)
  39. {
  40. if (options == null) throw new ArgumentNullException(nameof(options));
  41. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  42. if (!options.ClientOptions.CleanSession)
  43. {
  44. throw new NotSupportedException("The managed client does not support existing sessions.");
  45. }
  46. if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");
  47. _options = options;
  48. if (_options.Storage != null)
  49. {
  50. _storageManager = new ManagedMqttClientStorageManager(_options.Storage);
  51. await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  52. }
  53. _connectionCancellationToken = new CancellationTokenSource();
  54. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  55. Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token).ConfigureAwait(false);
  56. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  57. _logger.Info<ManagedMqttClient>("Started");
  58. }
  59. public Task StopAsync()
  60. {
  61. StopPublishing();
  62. StopMaintainingConnection();
  63. while (_messageQueue.Any())
  64. {
  65. _messageQueue.Take();
  66. }
  67. return Task.FromResult(0);
  68. }
  69. public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
  70. {
  71. if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));
  72. foreach (var applicationMessage in applicationMessages)
  73. {
  74. if (_storageManager != null)
  75. {
  76. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  77. }
  78. _messageQueue.Add(applicationMessage);
  79. }
  80. }
  81. public async Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  82. {
  83. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  84. await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
  85. try
  86. {
  87. foreach (var topicFilter in topicFilters)
  88. {
  89. _subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
  90. _subscriptionsNotPushed = true;
  91. }
  92. }
  93. finally
  94. {
  95. _subscriptionsSemaphore.Release();
  96. }
  97. }
  98. public async Task UnsubscribeAsync(IEnumerable<string> topics)
  99. {
  100. await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
  101. try
  102. {
  103. foreach (var topic in topics)
  104. {
  105. if (_subscriptions.Remove(topic))
  106. {
  107. _unsubscriptions.Add(topic);
  108. _subscriptionsNotPushed = true;
  109. }
  110. }
  111. }
  112. finally
  113. {
  114. _subscriptionsSemaphore.Release();
  115. }
  116. }
  117. public void Dispose()
  118. {
  119. _messageQueue?.Dispose();
  120. _subscriptionsSemaphore?.Dispose();
  121. _connectionCancellationToken?.Dispose();
  122. _publishingCancellationToken?.Dispose();
  123. }
  124. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  125. {
  126. try
  127. {
  128. while (!cancellationToken.IsCancellationRequested)
  129. {
  130. await TryMaintainConnectionAsync(cancellationToken);
  131. }
  132. }
  133. catch (OperationCanceledException)
  134. {
  135. }
  136. catch (Exception exception)
  137. {
  138. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while maintaining connection.");
  139. }
  140. finally
  141. {
  142. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  143. _logger.Info<ManagedMqttClient>("Stopped");
  144. }
  145. }
  146. private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  147. {
  148. try
  149. {
  150. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  151. if (connectionState == ReconnectionResult.NotConnected)
  152. {
  153. StopPublishing();
  154. await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  155. return;
  156. }
  157. if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
  158. {
  159. await PushSubscriptionsAsync().ConfigureAwait(false);
  160. StartPublishing();
  161. return;
  162. }
  163. if (connectionState == ReconnectionResult.StillConnected)
  164. {
  165. await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
  166. }
  167. }
  168. catch (OperationCanceledException)
  169. {
  170. }
  171. catch (MqttCommunicationException exception)
  172. {
  173. _logger.Warning<ManagedMqttClient>(exception, "Communication exception while maintaining connection.");
  174. }
  175. catch (Exception exception)
  176. {
  177. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while maintaining connection.");
  178. }
  179. }
  180. private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  181. {
  182. try
  183. {
  184. while (!cancellationToken.IsCancellationRequested)
  185. {
  186. var message = _messageQueue.Take(cancellationToken);
  187. if (message == null)
  188. {
  189. continue;
  190. }
  191. if (cancellationToken.IsCancellationRequested)
  192. {
  193. continue;
  194. }
  195. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  196. }
  197. }
  198. catch (OperationCanceledException)
  199. {
  200. }
  201. catch (Exception exception)
  202. {
  203. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while publishing queued application messages.");
  204. }
  205. finally
  206. {
  207. _logger.Trace<ManagedMqttClient>("Stopped publishing messages.");
  208. }
  209. }
  210. private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message)
  211. {
  212. try
  213. {
  214. await _mqttClient.PublishAsync(message).ConfigureAwait(false);
  215. if (_storageManager != null)
  216. {
  217. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  218. }
  219. }
  220. catch (MqttCommunicationException exception)
  221. {
  222. _logger.Warning<ManagedMqttClient>(exception, "Publishing application message failed.");
  223. if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  224. {
  225. _messageQueue.Add(message);
  226. }
  227. }
  228. catch (Exception exception)
  229. {
  230. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while publishing queued application message.");
  231. }
  232. }
  233. private async Task PushSubscriptionsAsync()
  234. {
  235. _logger.Info<ManagedMqttClient>(nameof(ManagedMqttClient), "Synchronizing subscriptions");
  236. List<TopicFilter> subscriptions;
  237. List<string> unsubscriptions;
  238. await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
  239. try
  240. {
  241. subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList();
  242. unsubscriptions = new List<string>(_unsubscriptions);
  243. _unsubscriptions.Clear();
  244. _subscriptionsNotPushed = false;
  245. }
  246. finally
  247. {
  248. _subscriptionsSemaphore.Release();
  249. }
  250. if (unsubscriptions.Any())
  251. {
  252. await _mqttClient.UnsubscribeAsync(unsubscriptions);
  253. }
  254. if (!subscriptions.Any())
  255. {
  256. return;
  257. }
  258. try
  259. {
  260. await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
  261. }
  262. catch (Exception exception)
  263. {
  264. _logger.Warning<ManagedMqttClient>(exception, "Synchronizing subscriptions failed");
  265. _subscriptionsNotPushed = true;
  266. }
  267. }
  268. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  269. {
  270. if (_mqttClient.IsConnected)
  271. {
  272. return ReconnectionResult.StillConnected;
  273. }
  274. try
  275. {
  276. await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
  277. return ReconnectionResult.Reconnected;
  278. }
  279. catch (Exception)
  280. {
  281. return ReconnectionResult.NotConnected;
  282. }
  283. }
  284. private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  285. {
  286. ApplicationMessageReceived?.Invoke(this, eventArgs);
  287. }
  288. private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
  289. {
  290. Disconnected?.Invoke(this, eventArgs);
  291. }
  292. private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs)
  293. {
  294. Connected?.Invoke(this, eventArgs);
  295. }
  296. private void StartPublishing()
  297. {
  298. if (_publishingCancellationToken != null)
  299. {
  300. StopPublishing();
  301. }
  302. var cts = new CancellationTokenSource();
  303. _publishingCancellationToken = cts;
  304. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  305. Task.Run(async () => await PublishQueuedMessagesAsync(cts.Token).ConfigureAwait(false), cts.Token).ConfigureAwait(false);
  306. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  307. }
  308. private void StopPublishing()
  309. {
  310. _publishingCancellationToken?.Cancel(false);
  311. _publishingCancellationToken?.Dispose();
  312. _publishingCancellationToken = null;
  313. }
  314. private void StopMaintainingConnection()
  315. {
  316. _connectionCancellationToken?.Cancel(false);
  317. _connectionCancellationToken?.Dispose();
  318. _connectionCancellationToken = null;
  319. }
  320. }
  321. }