Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.
 
 
 
 

516 řádky
20 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Adapter;
  8. using MQTTnet.Diagnostics;
  9. using MQTTnet.Exceptions;
  10. using MQTTnet.Internal;
  11. using MQTTnet.Packets;
  12. using MQTTnet.Protocol;
  13. namespace MQTTnet.Client
  14. {
  15. public class MqttClient : IMqttClient
  16. {
  17. private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
  18. private readonly Stopwatch _sendTracker = new Stopwatch();
  19. private readonly SemaphoreSlim _disconnectLock = new SemaphoreSlim(1, 1);
  20. private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
  21. private readonly IMqttClientAdapterFactory _adapterFactory;
  22. private readonly IMqttNetLogger _logger;
  23. private IMqttClientOptions _options;
  24. private CancellationTokenSource _cancellationTokenSource;
  25. private Task _packetReceiverTask;
  26. private Task _keepAliveMessageSenderTask;
  27. private IMqttChannelAdapter _adapter;
  28. public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger)
  29. {
  30. _adapterFactory = channelFactory ?? throw new ArgumentNullException(nameof(channelFactory));
  31. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  32. }
  33. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  34. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  35. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  36. public bool IsConnected { get; private set; }
  37. public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions options)
  38. {
  39. if (options == null) throw new ArgumentNullException(nameof(options));
  40. if (options.ChannelOptions == null) throw new ArgumentException("ChannelOptions are not set.");
  41. ThrowIfConnected("It is not allowed to connect with a server after the connection is established.");
  42. try
  43. {
  44. _options = options;
  45. _cancellationTokenSource = new CancellationTokenSource();
  46. _packetIdentifierProvider.Reset();
  47. _packetDispatcher.Reset();
  48. _adapter = _adapterFactory.CreateClientAdapter(options, _logger);
  49. _logger.Verbose<MqttClient>("Trying to connect with server.");
  50. await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
  51. _logger.Verbose<MqttClient>("Connection with server established.");
  52. StartReceivingPackets(_cancellationTokenSource.Token);
  53. var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false);
  54. _logger.Verbose<MqttClient>("MQTT connection with server established.");
  55. _sendTracker.Restart();
  56. if (_options.KeepAlivePeriod != TimeSpan.Zero)
  57. {
  58. StartSendingKeepAliveMessages(_cancellationTokenSource.Token);
  59. }
  60. IsConnected = true;
  61. Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent));
  62. _logger.Info<MqttClient>("Connected.");
  63. return new MqttClientConnectResult(connectResponse.IsSessionPresent);
  64. }
  65. catch (Exception exception)
  66. {
  67. _logger.Error<MqttClient>(exception, "Error while connecting with server.");
  68. await DisconnectInternalAsync(null, exception).ConfigureAwait(false);
  69. throw;
  70. }
  71. }
  72. public async Task DisconnectAsync()
  73. {
  74. try
  75. {
  76. if (IsConnected && !_cancellationTokenSource.IsCancellationRequested)
  77. {
  78. await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false);
  79. }
  80. }
  81. finally
  82. {
  83. await DisconnectInternalAsync(null, null).ConfigureAwait(false);
  84. }
  85. }
  86. public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  87. {
  88. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  89. ThrowIfNotConnected();
  90. var subscribePacket = new MqttSubscribePacket
  91. {
  92. PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(),
  93. TopicFilters = topicFilters.ToList()
  94. };
  95. var response = await SendAndReceiveAsync<MqttSubAckPacket>(subscribePacket).ConfigureAwait(false);
  96. if (response.SubscribeReturnCodes.Count != subscribePacket.TopicFilters.Count)
  97. {
  98. throw new MqttProtocolViolationException("The return codes are not matching the topic filters [MQTT-3.9.3-1].");
  99. }
  100. return subscribePacket.TopicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList();
  101. }
  102. public async Task UnsubscribeAsync(IEnumerable<string> topicFilters)
  103. {
  104. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  105. ThrowIfNotConnected();
  106. var unsubscribePacket = new MqttUnsubscribePacket
  107. {
  108. PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(),
  109. TopicFilters = topicFilters.ToList()
  110. };
  111. await SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket).ConfigureAwait(false);
  112. }
  113. public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
  114. {
  115. ThrowIfNotConnected();
  116. var publishPackets = applicationMessages.Select(m => m.ToPublishPacket());
  117. var packetGroups = publishPackets.GroupBy(p => p.QualityOfServiceLevel).OrderBy(g => g.Key);
  118. foreach (var qosGroup in packetGroups)
  119. {
  120. switch (qosGroup.Key)
  121. {
  122. case MqttQualityOfServiceLevel.AtMostOnce:
  123. {
  124. // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
  125. await SendAsync(qosGroup.Cast<MqttBasePacket>().ToArray()).ConfigureAwait(false);
  126. break;
  127. }
  128. case MqttQualityOfServiceLevel.AtLeastOnce:
  129. {
  130. foreach (var publishPacket in qosGroup)
  131. {
  132. publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
  133. await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket).ConfigureAwait(false);
  134. }
  135. break;
  136. }
  137. case MqttQualityOfServiceLevel.ExactlyOnce:
  138. {
  139. foreach (var publishPacket in qosGroup)
  140. {
  141. publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
  142. var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket).ConfigureAwait(false);
  143. var pubRelPacket = new MqttPubRelPacket
  144. {
  145. PacketIdentifier = pubRecPacket.PacketIdentifier
  146. };
  147. await SendAndReceiveAsync<MqttPubCompPacket>(pubRelPacket).ConfigureAwait(false);
  148. }
  149. break;
  150. }
  151. default:
  152. {
  153. throw new InvalidOperationException();
  154. }
  155. }
  156. }
  157. }
  158. public void Dispose()
  159. {
  160. _cancellationTokenSource?.Dispose();
  161. _cancellationTokenSource = null;
  162. _adapter?.Dispose();
  163. }
  164. private async Task<MqttConnAckPacket> AuthenticateAsync(MqttApplicationMessage willApplicationMessage)
  165. {
  166. var connectPacket = new MqttConnectPacket
  167. {
  168. ClientId = _options.ClientId,
  169. Username = _options.Credentials?.Username,
  170. Password = _options.Credentials?.Password,
  171. CleanSession = _options.CleanSession,
  172. KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
  173. WillMessage = willApplicationMessage
  174. };
  175. var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket).ConfigureAwait(false);
  176. if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
  177. {
  178. throw new MqttConnectingFailedException(response.ConnectReturnCode);
  179. }
  180. return response;
  181. }
  182. private void ThrowIfNotConnected()
  183. {
  184. if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
  185. }
  186. private void ThrowIfConnected(string message)
  187. {
  188. if (IsConnected) throw new MqttProtocolViolationException(message);
  189. }
  190. private async Task DisconnectInternalAsync(Task sender, Exception exception)
  191. {
  192. await _disconnectLock.WaitAsync();
  193. try
  194. {
  195. if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
  196. {
  197. return;
  198. }
  199. _cancellationTokenSource.Cancel(false);
  200. }
  201. catch (Exception adapterException)
  202. {
  203. _logger.Warning<MqttClient>(adapterException, "Error while disconnecting from adapter.");
  204. }
  205. finally
  206. {
  207. _disconnectLock.Release();
  208. }
  209. var clientWasConnected = IsConnected;
  210. IsConnected = false;
  211. try
  212. {
  213. if (_packetReceiverTask != null && _packetReceiverTask != sender)
  214. {
  215. _packetReceiverTask.Wait();
  216. }
  217. if (_keepAliveMessageSenderTask != null && _keepAliveMessageSenderTask != sender)
  218. {
  219. _keepAliveMessageSenderTask.Wait();
  220. }
  221. if (_adapter != null)
  222. {
  223. await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
  224. }
  225. _logger.Verbose<MqttClient>("Disconnected from adapter.");
  226. }
  227. catch (Exception adapterException)
  228. {
  229. _logger.Warning<MqttClient>(adapterException, "Error while disconnecting from adapter.");
  230. }
  231. finally
  232. {
  233. _adapter?.Dispose();
  234. _adapter = null;
  235. _cancellationTokenSource?.Dispose();
  236. _cancellationTokenSource = null;
  237. _logger.Info<MqttClient>("Disconnected.");
  238. Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
  239. }
  240. }
  241. private async Task ProcessReceivedPacketAsync(MqttBasePacket packet)
  242. {
  243. try
  244. {
  245. if (packet is MqttPublishPacket publishPacket)
  246. {
  247. await ProcessReceivedPublishPacketAsync(publishPacket).ConfigureAwait(false);
  248. return;
  249. }
  250. if (packet is MqttPingReqPacket)
  251. {
  252. await SendAsync(new MqttPingRespPacket()).ConfigureAwait(false);
  253. return;
  254. }
  255. if (packet is MqttDisconnectPacket)
  256. {
  257. await DisconnectAsync().ConfigureAwait(false);
  258. return;
  259. }
  260. if (packet is MqttPubRelPacket pubRelPacket)
  261. {
  262. await ProcessReceivedPubRelPacket(pubRelPacket).ConfigureAwait(false);
  263. return;
  264. }
  265. _packetDispatcher.Dispatch(packet);
  266. }
  267. catch (Exception exception)
  268. {
  269. _logger.Error<MqttClient>(exception, "Unhandled exception while processing received packet.");
  270. }
  271. }
  272. private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
  273. {
  274. try
  275. {
  276. var applicationMessage = publishPacket.ToApplicationMessage();
  277. ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(_options.ClientId, applicationMessage));
  278. }
  279. catch (Exception exception)
  280. {
  281. _logger.Error<MqttClient>(exception, "Unhandled exception while handling application message.");
  282. }
  283. }
  284. private Task ProcessReceivedPublishPacketAsync(MqttPublishPacket publishPacket)
  285. {
  286. if (_cancellationTokenSource.IsCancellationRequested)
  287. {
  288. return Task.FromResult(0);
  289. }
  290. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  291. {
  292. FireApplicationMessageReceivedEvent(publishPacket);
  293. return Task.FromResult(0);
  294. }
  295. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
  296. {
  297. FireApplicationMessageReceivedEvent(publishPacket);
  298. return SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
  299. }
  300. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
  301. {
  302. // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
  303. FireApplicationMessageReceivedEvent(publishPacket);
  304. return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
  305. }
  306. throw new MqttCommunicationException("Received a not supported QoS level.");
  307. }
  308. private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
  309. {
  310. var response = new MqttPubCompPacket
  311. {
  312. PacketIdentifier = pubRelPacket.PacketIdentifier
  313. };
  314. return SendAsync(response);
  315. }
  316. private Task SendAsync(params MqttBasePacket[] packets)
  317. {
  318. _sendTracker.Restart();
  319. return _adapter.SendPacketsAsync(_options.CommunicationTimeout, packets, _cancellationTokenSource.Token);
  320. }
  321. private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
  322. {
  323. _sendTracker.Restart();
  324. ushort identifier = 0;
  325. if (requestPacket is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue)
  326. {
  327. identifier = packetWithIdentifier.PacketIdentifier.Value;
  328. }
  329. var packetAwaiter = _packetDispatcher.AddPacketAwaiter<TResponsePacket>(identifier);
  330. try
  331. {
  332. await _adapter.SendPacketsAsync(_options.CommunicationTimeout, new[] { requestPacket }, _cancellationTokenSource.Token).ConfigureAwait(false);
  333. var respone = await Internal.TaskExtensions.TimeoutAfter(ct => packetAwaiter.Task, _options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
  334. return (TResponsePacket)respone;
  335. }
  336. catch (MqttCommunicationTimedOutException)
  337. {
  338. _logger.Warning<MqttPacketDispatcher>($"Timeout while waiting for packet of type '{typeof(TResponsePacket).Namespace}'.");
  339. throw;
  340. }
  341. finally
  342. {
  343. _packetDispatcher.RemovePacketAwaiter<TResponsePacket>(identifier);
  344. }
  345. }
  346. private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken)
  347. {
  348. _logger.Verbose<MqttClient>("Start sending keep alive packets.");
  349. try
  350. {
  351. while (!cancellationToken.IsCancellationRequested)
  352. {
  353. var keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75);
  354. if (_options.KeepAliveSendInterval.HasValue)
  355. {
  356. keepAliveSendInterval = _options.KeepAliveSendInterval.Value;
  357. }
  358. if (_sendTracker.Elapsed > keepAliveSendInterval)
  359. {
  360. await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()).ConfigureAwait(false);
  361. }
  362. await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false);
  363. }
  364. }
  365. catch (Exception exception)
  366. {
  367. if (exception is OperationCanceledException)
  368. {
  369. }
  370. else if (exception is MqttCommunicationException)
  371. {
  372. _logger.Warning<MqttClient>(exception, "MQTT communication exception while sending/receiving keep alive packets.");
  373. }
  374. else
  375. {
  376. _logger.Error<MqttClient>(exception, "Unhandled exception while sending/receiving keep alive packets.");
  377. }
  378. await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false);
  379. }
  380. finally
  381. {
  382. _logger.Verbose<MqttClient>("Stopped sending keep alive packets.");
  383. }
  384. }
  385. private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
  386. {
  387. _logger.Verbose<MqttClient>("Start receiving packets.");
  388. try
  389. {
  390. while (!cancellationToken.IsCancellationRequested)
  391. {
  392. var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
  393. if (cancellationToken.IsCancellationRequested)
  394. {
  395. return;
  396. }
  397. StartProcessReceivedPacket(packet, cancellationToken);
  398. }
  399. }
  400. catch (Exception exception)
  401. {
  402. if (exception is OperationCanceledException)
  403. {
  404. }
  405. else if (exception is MqttCommunicationException)
  406. {
  407. _logger.Warning<MqttClient>(exception, "MQTT communication exception while receiving packets.");
  408. }
  409. else
  410. {
  411. _logger.Error<MqttClient>(exception, "Unhandled exception while receiving packets.");
  412. }
  413. await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false);
  414. _packetDispatcher.Dispatch(exception);
  415. }
  416. finally
  417. {
  418. _logger.Verbose<MqttClient>("Stopped receiving packets.");
  419. }
  420. }
  421. private void StartProcessReceivedPacket(MqttBasePacket packet, CancellationToken cancellationToken)
  422. {
  423. Task.Run(() => ProcessReceivedPacketAsync(packet), cancellationToken);
  424. }
  425. private void StartReceivingPackets(CancellationToken cancellationToken)
  426. {
  427. _packetReceiverTask = Task.Run(() => ReceivePacketsAsync(cancellationToken), cancellationToken);
  428. }
  429. private void StartSendingKeepAliveMessages(CancellationToken cancellationToken)
  430. {
  431. _keepAliveMessageSenderTask = Task.Run(() => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken);
  432. }
  433. }
  434. }