選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。
 
 
 
 

340 行
12 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Client;
  8. using MQTTnet.Diagnostics;
  9. using MQTTnet.Exceptions;
  10. using MQTTnet.Protocol;
  11. namespace MQTTnet.ManagedClient
  12. {
  13. public class ManagedMqttClient : IManagedMqttClient
  14. {
  15. private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>();
  16. private readonly HashSet<TopicFilter> _subscriptions = new HashSet<TopicFilter>();
  17. private readonly SemaphoreSlim _subscriptionsSemaphore = new SemaphoreSlim(1, 1);
  18. private readonly IMqttClient _mqttClient;
  19. private readonly IMqttNetLogger _logger;
  20. private CancellationTokenSource _connectionCancellationToken;
  21. private CancellationTokenSource _publishingCancellationToken;
  22. private ManagedMqttClientStorageManager _storageManager;
  23. private IManagedMqttClientOptions _options;
  24. private bool _subscriptionsNotPushed;
  25. public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
  26. {
  27. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  28. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  29. _mqttClient.Connected += OnConnected;
  30. _mqttClient.Disconnected += OnDisconnected;
  31. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  32. }
  33. public bool IsConnected => _mqttClient.IsConnected;
  34. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  35. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  36. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  37. public async Task StartAsync(IManagedMqttClientOptions options)
  38. {
  39. if (options == null) throw new ArgumentNullException(nameof(options));
  40. if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
  41. if (!options.ClientOptions.CleanSession)
  42. {
  43. throw new NotSupportedException("The managed client does not support existing sessions.");
  44. }
  45. if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");
  46. _options = options;
  47. if (_options.Storage != null)
  48. {
  49. _storageManager = new ManagedMqttClientStorageManager(_options.Storage);
  50. await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);
  51. }
  52. _connectionCancellationToken = new CancellationTokenSource();
  53. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  54. Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token).ConfigureAwait(false);
  55. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  56. _logger.Info<ManagedMqttClient>("Started");
  57. }
  58. public Task StopAsync()
  59. {
  60. _connectionCancellationToken?.Cancel(false);
  61. _connectionCancellationToken = null;
  62. _publishingCancellationToken?.Cancel(false);
  63. _publishingCancellationToken = null;
  64. while (_messageQueue.Any())
  65. {
  66. _messageQueue.Take();
  67. }
  68. return Task.FromResult(0);
  69. }
  70. public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
  71. {
  72. if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));
  73. foreach (var applicationMessage in applicationMessages)
  74. {
  75. if (_storageManager != null)
  76. {
  77. await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
  78. }
  79. _messageQueue.Add(applicationMessage);
  80. }
  81. }
  82. public async Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  83. {
  84. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  85. await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
  86. try
  87. {
  88. foreach (var topicFilter in topicFilters)
  89. {
  90. if (_subscriptions.Add(topicFilter))
  91. {
  92. _subscriptionsNotPushed = true;
  93. }
  94. }
  95. }
  96. finally
  97. {
  98. _subscriptionsSemaphore.Release();
  99. }
  100. }
  101. public async Task UnsubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  102. {
  103. await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
  104. try
  105. {
  106. foreach (var topicFilter in topicFilters)
  107. {
  108. if (_subscriptions.Remove(topicFilter))
  109. {
  110. _subscriptionsNotPushed = true;
  111. }
  112. }
  113. }
  114. finally
  115. {
  116. _subscriptionsSemaphore.Release();
  117. }
  118. }
  119. private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
  120. {
  121. try
  122. {
  123. while (!cancellationToken.IsCancellationRequested)
  124. {
  125. await TryMaintainConnectionAsync(cancellationToken);
  126. }
  127. }
  128. catch (OperationCanceledException)
  129. {
  130. }
  131. catch (Exception exception)
  132. {
  133. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while maintaining connection.");
  134. }
  135. finally
  136. {
  137. await _mqttClient.DisconnectAsync().ConfigureAwait(false);
  138. _logger.Info<ManagedMqttClient>("Stopped");
  139. }
  140. }
  141. private async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
  142. {
  143. try
  144. {
  145. var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
  146. if (connectionState == ReconnectionResult.NotConnected)
  147. {
  148. _publishingCancellationToken?.Cancel(false);
  149. _publishingCancellationToken = null;
  150. await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
  151. return;
  152. }
  153. if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
  154. {
  155. await PushSubscriptionsAsync().ConfigureAwait(false);
  156. _publishingCancellationToken = new CancellationTokenSource();
  157. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  158. Task.Run(async () => await PublishQueuedMessagesAsync(_publishingCancellationToken.Token).ConfigureAwait(false), _publishingCancellationToken.Token).ConfigureAwait(false);
  159. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  160. return;
  161. }
  162. if (connectionState == ReconnectionResult.StillConnected)
  163. {
  164. await Task.Delay(TimeSpan.FromSeconds(1), _connectionCancellationToken.Token).ConfigureAwait(false);
  165. }
  166. }
  167. catch (OperationCanceledException)
  168. {
  169. }
  170. catch (MqttCommunicationException exception)
  171. {
  172. _logger.Warning<ManagedMqttClient>(exception, "Communication exception while maintaining connection.");
  173. }
  174. catch (Exception exception)
  175. {
  176. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while maintaining connection.");
  177. }
  178. }
  179. private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
  180. {
  181. try
  182. {
  183. while (!cancellationToken.IsCancellationRequested)
  184. {
  185. var message = _messageQueue.Take(cancellationToken);
  186. if (message == null)
  187. {
  188. continue;
  189. }
  190. if (cancellationToken.IsCancellationRequested)
  191. {
  192. continue;
  193. }
  194. await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
  195. }
  196. }
  197. catch (OperationCanceledException)
  198. {
  199. }
  200. catch (Exception exception)
  201. {
  202. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while publishing queued application messages.");
  203. }
  204. finally
  205. {
  206. _logger.Trace<ManagedMqttClient>("Stopped publishing messages.");
  207. }
  208. }
  209. private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message)
  210. {
  211. try
  212. {
  213. await _mqttClient.PublishAsync(message).ConfigureAwait(false);
  214. if (_storageManager != null)
  215. {
  216. await _storageManager.RemoveAsync(message).ConfigureAwait(false);
  217. }
  218. }
  219. catch (MqttCommunicationException exception)
  220. {
  221. _logger.Warning<ManagedMqttClient>(exception, "Publishing application message failed.");
  222. if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  223. {
  224. _messageQueue.Add(message);
  225. }
  226. }
  227. catch (Exception exception)
  228. {
  229. _logger.Error<ManagedMqttClient>(exception, "Unhandled exception while publishing queued application message.");
  230. }
  231. }
  232. private async Task PushSubscriptionsAsync()
  233. {
  234. _logger.Info<ManagedMqttClient>(nameof(ManagedMqttClient), "Synchronizing subscriptions");
  235. List<TopicFilter> subscriptions;
  236. await _subscriptionsSemaphore.WaitAsync().ConfigureAwait(false);
  237. try
  238. {
  239. subscriptions = _subscriptions.ToList();
  240. _subscriptionsNotPushed = false;
  241. }
  242. finally
  243. {
  244. _subscriptionsSemaphore.Release();
  245. }
  246. if (!subscriptions.Any())
  247. {
  248. return;
  249. }
  250. try
  251. {
  252. await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
  253. }
  254. catch (Exception exception)
  255. {
  256. _logger.Warning<ManagedMqttClient>(exception, "Synchronizing subscriptions failed");
  257. _subscriptionsNotPushed = true;
  258. }
  259. }
  260. private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
  261. {
  262. if (_mqttClient.IsConnected)
  263. {
  264. return ReconnectionResult.StillConnected;
  265. }
  266. try
  267. {
  268. await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
  269. return ReconnectionResult.Reconnected;
  270. }
  271. catch (Exception)
  272. {
  273. return ReconnectionResult.NotConnected;
  274. }
  275. }
  276. private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  277. {
  278. ApplicationMessageReceived?.Invoke(this, eventArgs);
  279. }
  280. private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
  281. {
  282. Disconnected?.Invoke(this, eventArgs);
  283. }
  284. private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs)
  285. {
  286. Connected?.Invoke(this, eventArgs);
  287. }
  288. }
  289. }