Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 
 

549 строки
21 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Adapter;
  8. using MQTTnet.Diagnostics;
  9. using MQTTnet.Exceptions;
  10. using MQTTnet.Internal;
  11. using MQTTnet.Packets;
  12. using MQTTnet.Protocol;
  13. namespace MQTTnet.Client
  14. {
  15. public class MqttClient : IMqttClient
  16. {
  17. private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
  18. private readonly Stopwatch _sendTracker = new Stopwatch();
  19. private readonly SemaphoreSlim _disconnectLock = new SemaphoreSlim(1, 1);
  20. private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
  21. private readonly IMqttClientAdapterFactory _adapterFactory;
  22. private readonly IMqttNetChildLogger _logger;
  23. private IMqttClientOptions _options;
  24. private CancellationTokenSource _cancellationTokenSource;
  25. private Task _packetReceiverTask;
  26. private Task _keepAliveMessageSenderTask;
  27. private IMqttChannelAdapter _adapter;
  28. private bool _cleanDisconnectInitiated;
  29. public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger)
  30. {
  31. if (logger == null) throw new ArgumentNullException(nameof(logger));
  32. _adapterFactory = channelFactory ?? throw new ArgumentNullException(nameof(channelFactory));
  33. _logger = logger.CreateChildLogger(nameof(MqttClient));
  34. }
  35. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  36. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  37. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  38. public bool IsConnected { get; private set; }
  39. public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions options)
  40. {
  41. if (options == null) throw new ArgumentNullException(nameof(options));
  42. if (options.ChannelOptions == null) throw new ArgumentException("ChannelOptions are not set.");
  43. ThrowIfConnected("It is not allowed to connect with a server after the connection is established.");
  44. try
  45. {
  46. _cancellationTokenSource = new CancellationTokenSource();
  47. _options = options;
  48. _packetIdentifierProvider.Reset();
  49. _packetDispatcher.Reset();
  50. _adapter = _adapterFactory.CreateClientAdapter(options, _logger);
  51. _logger.Verbose($"Trying to connect with server ({_options.ChannelOptions}).");
  52. await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
  53. _logger.Verbose("Connection with server established.");
  54. StartReceivingPackets(_cancellationTokenSource.Token);
  55. var connectResponse = await AuthenticateAsync(options.WillMessage, _cancellationTokenSource.Token).ConfigureAwait(false);
  56. _logger.Verbose("MQTT connection with server established.");
  57. _sendTracker.Restart();
  58. if (_options.KeepAlivePeriod != TimeSpan.Zero)
  59. {
  60. StartSendingKeepAliveMessages(_cancellationTokenSource.Token);
  61. }
  62. IsConnected = true;
  63. Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent));
  64. _logger.Info("Connected.");
  65. return new MqttClientConnectResult(connectResponse.IsSessionPresent);
  66. }
  67. catch (Exception exception)
  68. {
  69. _logger.Error(exception, "Error while connecting with server.");
  70. await DisconnectInternalAsync(null, exception).ConfigureAwait(false);
  71. throw;
  72. }
  73. }
  74. public async Task DisconnectAsync()
  75. {
  76. try
  77. {
  78. _cleanDisconnectInitiated = true;
  79. if (IsConnected && !_cancellationTokenSource.IsCancellationRequested)
  80. {
  81. await SendAsync(new MqttDisconnectPacket(), _cancellationTokenSource.Token).ConfigureAwait(false);
  82. }
  83. }
  84. finally
  85. {
  86. await DisconnectInternalAsync(null, null).ConfigureAwait(false);
  87. }
  88. }
  89. public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  90. {
  91. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  92. ThrowIfNotConnected();
  93. var subscribePacket = new MqttSubscribePacket
  94. {
  95. PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(),
  96. TopicFilters = topicFilters.ToList()
  97. };
  98. var response = await SendAndReceiveAsync<MqttSubAckPacket>(subscribePacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  99. if (response.SubscribeReturnCodes.Count != subscribePacket.TopicFilters.Count)
  100. {
  101. throw new MqttProtocolViolationException("The return codes are not matching the topic filters [MQTT-3.9.3-1].");
  102. }
  103. return subscribePacket.TopicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList();
  104. }
  105. public async Task UnsubscribeAsync(IEnumerable<string> topicFilters)
  106. {
  107. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  108. ThrowIfNotConnected();
  109. var unsubscribePacket = new MqttUnsubscribePacket
  110. {
  111. PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(),
  112. TopicFilters = topicFilters.ToList()
  113. };
  114. await SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  115. }
  116. public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
  117. {
  118. ThrowIfNotConnected();
  119. var publishPackets = applicationMessages.Select(m => m.ToPublishPacket());
  120. var packetGroups = publishPackets.GroupBy(p => p.QualityOfServiceLevel).OrderBy(g => g.Key);
  121. foreach (var qosGroup in packetGroups)
  122. {
  123. switch (qosGroup.Key)
  124. {
  125. case MqttQualityOfServiceLevel.AtMostOnce:
  126. {
  127. // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
  128. await SendAsync(qosGroup, _cancellationTokenSource.Token).ConfigureAwait(false);
  129. break;
  130. }
  131. case MqttQualityOfServiceLevel.AtLeastOnce:
  132. {
  133. foreach (var publishPacket in qosGroup)
  134. {
  135. publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
  136. await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  137. }
  138. break;
  139. }
  140. case MqttQualityOfServiceLevel.ExactlyOnce:
  141. {
  142. foreach (var publishPacket in qosGroup)
  143. {
  144. publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
  145. var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  146. var pubRelPacket = new MqttPubRelPacket
  147. {
  148. PacketIdentifier = pubRecPacket.PacketIdentifier
  149. };
  150. await SendAndReceiveAsync<MqttPubCompPacket>(pubRelPacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  151. }
  152. break;
  153. }
  154. default:
  155. {
  156. throw new InvalidOperationException();
  157. }
  158. }
  159. }
  160. }
  161. public void Dispose()
  162. {
  163. _cancellationTokenSource?.Dispose();
  164. _cancellationTokenSource = null;
  165. _adapter?.Dispose();
  166. }
  167. private async Task<MqttConnAckPacket> AuthenticateAsync(MqttApplicationMessage willApplicationMessage, CancellationToken cancellationToken)
  168. {
  169. var connectPacket = new MqttConnectPacket
  170. {
  171. ClientId = _options.ClientId,
  172. Username = _options.Credentials?.Username,
  173. Password = _options.Credentials?.Password,
  174. CleanSession = _options.CleanSession,
  175. KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
  176. WillMessage = willApplicationMessage
  177. };
  178. var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket, cancellationToken).ConfigureAwait(false);
  179. if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
  180. {
  181. throw new MqttConnectingFailedException(response.ConnectReturnCode);
  182. }
  183. return response;
  184. }
  185. private void ThrowIfNotConnected()
  186. {
  187. if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
  188. }
  189. private void ThrowIfConnected(string message)
  190. {
  191. if (IsConnected) throw new MqttProtocolViolationException(message);
  192. }
  193. private async Task DisconnectInternalAsync(Task sender, Exception exception)
  194. {
  195. await InitiateDisconnectAsync().ConfigureAwait(false);
  196. var clientWasConnected = IsConnected;
  197. IsConnected = false;
  198. try
  199. {
  200. await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false);
  201. await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false);
  202. if (_adapter != null)
  203. {
  204. await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
  205. }
  206. _logger.Verbose("Disconnected from adapter.");
  207. }
  208. catch (Exception adapterException)
  209. {
  210. _logger.Warning(adapterException, "Error while disconnecting from adapter.");
  211. }
  212. finally
  213. {
  214. _adapter?.Dispose();
  215. _adapter = null;
  216. _cancellationTokenSource?.Dispose();
  217. _cancellationTokenSource = null;
  218. _cleanDisconnectInitiated = false;
  219. _logger.Info("Disconnected.");
  220. Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
  221. }
  222. }
  223. private async Task InitiateDisconnectAsync()
  224. {
  225. await _disconnectLock.WaitAsync().ConfigureAwait(false);
  226. try
  227. {
  228. if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
  229. {
  230. return;
  231. }
  232. _cancellationTokenSource.Cancel(false);
  233. }
  234. catch (Exception adapterException)
  235. {
  236. _logger.Warning(adapterException, "Error while initiating disconnect.");
  237. }
  238. finally
  239. {
  240. _disconnectLock.Release();
  241. }
  242. }
  243. private Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken)
  244. {
  245. return SendAsync(new[] { packet }, cancellationToken);
  246. }
  247. private Task SendAsync(IEnumerable<MqttBasePacket> packets, CancellationToken cancellationToken)
  248. {
  249. if (cancellationToken.IsCancellationRequested)
  250. {
  251. throw new TaskCanceledException();
  252. }
  253. _sendTracker.Restart();
  254. return _adapter.SendPacketsAsync(_options.CommunicationTimeout, packets, cancellationToken);
  255. }
  256. private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttBasePacket
  257. {
  258. if (cancellationToken.IsCancellationRequested)
  259. {
  260. throw new TaskCanceledException();
  261. }
  262. _sendTracker.Restart();
  263. ushort identifier = 0;
  264. if (requestPacket is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue)
  265. {
  266. identifier = packetWithIdentifier.PacketIdentifier.Value;
  267. }
  268. var packetAwaiter = _packetDispatcher.AddPacketAwaiter<TResponsePacket>(identifier);
  269. try
  270. {
  271. await _adapter.SendPacketsAsync(_options.CommunicationTimeout, new[] { requestPacket }, cancellationToken).ConfigureAwait(false);
  272. var respone = await Internal.TaskExtensions.TimeoutAfter(ct => packetAwaiter.Task, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
  273. return (TResponsePacket)respone;
  274. }
  275. catch (MqttCommunicationTimedOutException)
  276. {
  277. _logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Namespace);
  278. throw;
  279. }
  280. finally
  281. {
  282. _packetDispatcher.RemovePacketAwaiter<TResponsePacket>(identifier);
  283. }
  284. }
  285. private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken)
  286. {
  287. _logger.Verbose("Start sending keep alive packets.");
  288. try
  289. {
  290. while (!cancellationToken.IsCancellationRequested)
  291. {
  292. var keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75);
  293. if (_options.KeepAliveSendInterval.HasValue)
  294. {
  295. keepAliveSendInterval = _options.KeepAliveSendInterval.Value;
  296. }
  297. var waitTime = keepAliveSendInterval - _sendTracker.Elapsed;
  298. if (waitTime <= TimeSpan.Zero)
  299. {
  300. await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket(), cancellationToken).ConfigureAwait(false);
  301. waitTime = keepAliveSendInterval;
  302. }
  303. await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false);
  304. }
  305. }
  306. catch (Exception exception)
  307. {
  308. if (_cleanDisconnectInitiated)
  309. {
  310. return;
  311. }
  312. if (exception is OperationCanceledException)
  313. {
  314. }
  315. else if (exception is MqttCommunicationException)
  316. {
  317. _logger.Warning(exception, "MQTT communication exception while sending/receiving keep alive packets.");
  318. }
  319. else
  320. {
  321. _logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets.");
  322. }
  323. await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false);
  324. }
  325. finally
  326. {
  327. _logger.Verbose("Stopped sending keep alive packets.");
  328. }
  329. }
  330. private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
  331. {
  332. _logger.Verbose("Start receiving packets.");
  333. try
  334. {
  335. while (!cancellationToken.IsCancellationRequested)
  336. {
  337. var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
  338. if (packet != null)
  339. {
  340. await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false);
  341. }
  342. }
  343. }
  344. catch (Exception exception)
  345. {
  346. if (_cleanDisconnectInitiated)
  347. {
  348. return;
  349. }
  350. if (exception is OperationCanceledException)
  351. {
  352. }
  353. else if (exception is MqttCommunicationException)
  354. {
  355. _logger.Warning(exception, "MQTT communication exception while receiving packets.");
  356. }
  357. else
  358. {
  359. _logger.Error(exception, "Unhandled exception while receiving packets.");
  360. }
  361. _packetDispatcher.Dispatch(exception);
  362. await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false);
  363. }
  364. finally
  365. {
  366. _logger.Verbose("Stopped receiving packets.");
  367. }
  368. }
  369. private Task ProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
  370. {
  371. if (packet is MqttPublishPacket publishPacket)
  372. {
  373. return ProcessReceivedPublishPacketAsync(publishPacket, cancellationToken);
  374. }
  375. if (packet is MqttPingReqPacket)
  376. {
  377. return SendAsync(new MqttPingRespPacket(), cancellationToken);
  378. }
  379. if (packet is MqttDisconnectPacket)
  380. {
  381. return DisconnectAsync();
  382. }
  383. if (packet is MqttPubRelPacket pubRelPacket)
  384. {
  385. return ProcessReceivedPubRelPacket(pubRelPacket, cancellationToken);
  386. }
  387. _packetDispatcher.Dispatch(packet);
  388. return Task.FromResult(0);
  389. }
  390. private Task ProcessReceivedPublishPacketAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
  391. {
  392. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  393. {
  394. FireApplicationMessageReceivedEvent(publishPacket);
  395. return Task.FromResult(0);
  396. }
  397. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
  398. {
  399. FireApplicationMessageReceivedEvent(publishPacket);
  400. return SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, cancellationToken);
  401. }
  402. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
  403. {
  404. // QoS 2 is implement as method "B" (4.3.3 QoS 2: Exactly once delivery)
  405. FireApplicationMessageReceivedEvent(publishPacket);
  406. return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, cancellationToken);
  407. }
  408. throw new MqttCommunicationException("Received a not supported QoS level.");
  409. }
  410. private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken)
  411. {
  412. var response = new MqttPubCompPacket
  413. {
  414. PacketIdentifier = pubRelPacket.PacketIdentifier
  415. };
  416. return SendAsync(response, cancellationToken);
  417. }
  418. private void StartReceivingPackets(CancellationToken cancellationToken)
  419. {
  420. _packetReceiverTask = Task.Run(
  421. () => ReceivePacketsAsync(cancellationToken),
  422. cancellationToken);
  423. }
  424. private void StartSendingKeepAliveMessages(CancellationToken cancellationToken)
  425. {
  426. _keepAliveMessageSenderTask = Task.Run(
  427. () => SendKeepAliveMessagesAsync(cancellationToken),
  428. cancellationToken);
  429. }
  430. private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
  431. {
  432. try
  433. {
  434. var applicationMessage = publishPacket.ToApplicationMessage();
  435. ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(_options.ClientId, applicationMessage));
  436. }
  437. catch (Exception exception)
  438. {
  439. _logger.Error(exception, "Unhandled exception while handling application message.");
  440. }
  441. }
  442. private static async Task WaitForTaskAsync(Task task, Task sender)
  443. {
  444. if (task == sender || task == null)
  445. {
  446. return;
  447. }
  448. if (task.IsCanceled || task.IsCompleted || task.IsFaulted)
  449. {
  450. return;
  451. }
  452. try
  453. {
  454. await task.ConfigureAwait(false);
  455. }
  456. catch (TaskCanceledException)
  457. {
  458. }
  459. }
  460. }
  461. }