Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.
 
 
 
 

306 řádky
11 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Core.Client;
  8. using MQTTnet.Core.Diagnostics;
  9. using MQTTnet.Core.Exceptions;
  10. using MQTTnet.Core.Packets;
  11. using MQTTnet.Core.Protocol;
  12. namespace MQTTnet.Core.ManagedClient
  13. {
  14. public class ManagedMqttClient : IApplicationMessageReceiver
  15. {
  16. private readonly ManagedMqttClientStorageManager _storageManager = new ManagedMqttClientStorageManager();
  17. private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>();
  18. private readonly HashSet<TopicFilter> _subscriptions = new HashSet<TopicFilter>();
  19. private readonly IMqttClient _mqttClient;
  20. private readonly MqttNetTrace _trace;
  21. private CancellationTokenSource _connectionCancellationToken;
  22. private CancellationTokenSource _publishingCancellationToken;
  23. private IManagedMqttClientOptions _options;
  24. private bool _subscriptionsNotPushed;
  25. public ManagedMqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory, MqttNetTrace trace)
  26. {
  27. if (communicationChannelFactory == null) throw new ArgumentNullException(nameof(communicationChannelFactory));
  28. _trace = trace ?? throw new ArgumentNullException(nameof(trace));
  29. _mqttClient = new MqttClient(communicationChannelFactory, _trace);
  30. _mqttClient.Connected += OnConnected;
  31. _mqttClient.Disconnected += OnDisconnected;
  32. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  33. }
  34. public bool IsConnected => _mqttClient.IsConnected;
  35. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  36. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  37. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  38. public async Task StartAsync(IManagedMqttClientOptions options)
  39. {
  40. if (options == null) throw new ArgumentNullException(nameof(options));
  41. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  42. if (!options.ClientOptions.CleanSession)
  43. {
  44. throw new NotSupportedException("The managed client does not support existing sessions.");
  45. }
  46. if (_connectionCancellationToken != null)
  47. {
  48. throw new InvalidOperationException("The managed client is already started.");
  49. }
  50. _options = options;
  51. await _storageManager.SetStorageAsync(_options.Storage).ConfigureAwait(false);
  52. if (_options.Storage != null)
  53. {
  54. var loadedMessages = await _options.Storage.LoadQueuedMessagesAsync().ConfigureAwait(false);
  55. foreach (var loadedMessage in loadedMessages)
  56. {
  57. _messageQueue.Add(loadedMessage);
  58. }
  59. }
  60. _connectionCancellationToken = new CancellationTokenSource();
  61. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  62. Task.Factory.StartNew(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
  63. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  64. _trace.Information(nameof(ManagedMqttClient), "Started");
  65. }
  66. public Task StopAsync()
  67. {
  68. _connectionCancellationToken?.Cancel(false);
  69. _connectionCancellationToken = null;
  70. while (_messageQueue.Any())
  71. {
  72. _messageQueue.Take();
  73. }
  74. return Task.FromResult(0);
  75. }
  76. public Task EnqueueAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
  77. {
  78. if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));
  79. foreach (var applicationMessage in applicationMessages)
  80. {
  81. _messageQueue.Add(applicationMessage);
  82. }
  83. return Task.FromResult(0);
  84. }
  85. public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  86. {
  87. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  88. lock (_subscriptions)
  89. {
  90. foreach (var topicFilter in topicFilters)
  91. {
  92. if (_subscriptions.Add(topicFilter))
  93. {
  94. _subscriptionsNotPushed = true;
  95. }
  96. }
  97. }
  98. return Task.FromResult(0);
  99. }
  100. public Task UnsubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  101. {
  102. lock (_subscriptions)
  103. {
  104. foreach (var topicFilter in topicFilters)
  105. {
  106. if (_subscriptions.Remove(topicFilter))
  107. {
  108. _subscriptionsNotPushed = true;
  109. }
  110. }
  111. }
  112. return Task.FromResult(0);
  113. }
  114. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  115. {
  116. try
  117. {
  118. while (!cancellationToken.IsCancellationRequested)
  119. {
  120. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  121. if (connectionState == ReconnectionResult.NotConnected)
  122. {
  123. _publishingCancellationToken?.Cancel(false);
  124. _publishingCancellationToken = null;
  125. await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  126. continue;
  127. }
  128. if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
  129. {
  130. await PushSubscriptionsAsync();
  131. _publishingCancellationToken = new CancellationTokenSource();
  132. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  133. Task.Factory.StartNew(() => PublishQueuedMessagesAsync(_publishingCancellationToken.Token), _publishingCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
  134. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  135. continue;
  136. }
  137. if (connectionState == ReconnectionResult.StillConnected)
  138. {
  139. await Task.Delay(100, _connectionCancellationToken.Token).ConfigureAwait(false); // Consider using the _Disconnected_ event here. (TaskCompletionSource)
  140. }
  141. }
  142. }
  143. catch (OperationCanceledException)
  144. {
  145. }
  146. catch (MqttCommunicationException exception)
  147. {
  148. _trace.Warning(nameof(ManagedMqttClient), exception, "Communication exception while maintaining connection.");
  149. }
  150. catch (Exception exception)
  151. {
  152. _trace.Error(nameof(ManagedMqttClient), exception, "Unhandled exception while maintaining connection.");
  153. }
  154. finally
  155. {
  156. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  157. _trace.Information(nameof(ManagedMqttClient), "Stopped");
  158. }
  159. }
  160. private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  161. {
  162. try
  163. {
  164. while (!cancellationToken.IsCancellationRequested)
  165. {
  166. var message = _messageQueue.Take(cancellationToken);
  167. if (message == null)
  168. {
  169. continue;
  170. }
  171. if (cancellationToken.IsCancellationRequested)
  172. {
  173. continue;
  174. }
  175. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  176. }
  177. }
  178. catch (OperationCanceledException)
  179. {
  180. }
  181. finally
  182. {
  183. _trace.Information(nameof(ManagedMqttClient), "Stopped publishing messages");
  184. }
  185. }
  186. private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message)
  187. {
  188. try
  189. {
  190. await _mqttClient.PublishAsync(message).ConfigureAwait(false);
  191. }
  192. catch (MqttCommunicationException exception)
  193. {
  194. _trace.Warning(nameof(ManagedMqttClient), exception, "Publishing application message failed.");
  195. if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  196. {
  197. _messageQueue.Add(message);
  198. }
  199. }
  200. catch (Exception exception)
  201. {
  202. _trace.Error(nameof(ManagedMqttClient), exception, "Unhandled exception while publishing queued application message.");
  203. }
  204. }
  205. private async Task PushSubscriptionsAsync()
  206. {
  207. _trace.Information(nameof(ManagedMqttClient), "Synchronizing subscriptions");
  208. List<TopicFilter> subscriptions;
  209. lock (_subscriptions)
  210. {
  211. subscriptions = _subscriptions.ToList();
  212. _subscriptionsNotPushed = false;
  213. }
  214. if (!_subscriptions.Any())
  215. {
  216. return;
  217. }
  218. try
  219. {
  220. await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
  221. }
  222. catch (Exception exception)
  223. {
  224. _trace.Warning(nameof(ManagedMqttClient), exception, "Synchronizing subscriptions failed");
  225. _subscriptionsNotPushed = true;
  226. }
  227. }
  228. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  229. {
  230. if (_mqttClient.IsConnected)
  231. {
  232. return ReconnectionResult.StillConnected;
  233. }
  234. try
  235. {
  236. await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
  237. return ReconnectionResult.Reconnected;
  238. }
  239. catch (Exception)
  240. {
  241. return ReconnectionResult.NotConnected;
  242. }
  243. }
  244. private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  245. {
  246. ApplicationMessageReceived?.Invoke(this, eventArgs);
  247. }
  248. private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
  249. {
  250. Disconnected?.Invoke(this, eventArgs);
  251. }
  252. private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs)
  253. {
  254. Connected?.Invoke(this, eventArgs);
  255. }
  256. }
  257. }