You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

559 lines
22 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Adapter;
  8. using MQTTnet.Diagnostics;
  9. using MQTTnet.Exceptions;
  10. using MQTTnet.Internal;
  11. using MQTTnet.Packets;
  12. using MQTTnet.Protocol;
  13. namespace MQTTnet.Client
  14. {
  15. public class MqttClient : IMqttClient
  16. {
  17. private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
  18. private readonly Stopwatch _sendTracker = new Stopwatch();
  19. private readonly SemaphoreSlim _disconnectLock = new SemaphoreSlim(1, 1);
  20. private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
  21. private readonly IMqttClientAdapterFactory _adapterFactory;
  22. private readonly IMqttNetLogger _logger;
  23. private IMqttClientOptions _options;
  24. private CancellationTokenSource _cancellationTokenSource;
  25. private Task _packetReceiverTask;
  26. private Task _keepAliveMessageSenderTask;
  27. private IMqttChannelAdapter _adapter;
  28. public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger)
  29. {
  30. _adapterFactory = channelFactory ?? throw new ArgumentNullException(nameof(channelFactory));
  31. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  32. }
  33. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  34. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  35. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  36. public bool IsConnected { get; private set; }
  37. public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions options)
  38. {
  39. if (options == null) throw new ArgumentNullException(nameof(options));
  40. if (options.ChannelOptions == null) throw new ArgumentException("ChannelOptions are not set.");
  41. ThrowIfConnected("It is not allowed to connect with a server after the connection is established.");
  42. try
  43. {
  44. _cancellationTokenSource = new CancellationTokenSource();
  45. _options = options;
  46. _packetIdentifierProvider.Reset();
  47. _packetDispatcher.Reset();
  48. _adapter = _adapterFactory.CreateClientAdapter(options, _logger);
  49. _logger.Verbose<MqttClient>("Trying to connect with server.");
  50. await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
  51. _logger.Verbose<MqttClient>("Connection with server established.");
  52. StartReceivingPackets(_cancellationTokenSource.Token);
  53. var connectResponse = await AuthenticateAsync(options.WillMessage, _cancellationTokenSource.Token).ConfigureAwait(false);
  54. _logger.Verbose<MqttClient>("MQTT connection with server established.");
  55. _sendTracker.Restart();
  56. if (_options.KeepAlivePeriod != TimeSpan.Zero)
  57. {
  58. StartSendingKeepAliveMessages(_cancellationTokenSource.Token);
  59. }
  60. IsConnected = true;
  61. Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent));
  62. _logger.Info<MqttClient>("Connected.");
  63. return new MqttClientConnectResult(connectResponse.IsSessionPresent);
  64. }
  65. catch (Exception exception)
  66. {
  67. _logger.Error<MqttClient>(exception, "Error while connecting with server.");
  68. await DisconnectInternalAsync(null, exception).ConfigureAwait(false);
  69. throw;
  70. }
  71. }
  72. public async Task DisconnectAsync()
  73. {
  74. try
  75. {
  76. if (IsConnected && !_cancellationTokenSource.IsCancellationRequested)
  77. {
  78. await SendAsync(new MqttDisconnectPacket(), _cancellationTokenSource.Token).ConfigureAwait(false);
  79. }
  80. }
  81. finally
  82. {
  83. await DisconnectInternalAsync(null, null).ConfigureAwait(false);
  84. }
  85. }
  86. public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  87. {
  88. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  89. ThrowIfNotConnected();
  90. var subscribePacket = new MqttSubscribePacket
  91. {
  92. PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(),
  93. TopicFilters = topicFilters.ToList()
  94. };
  95. var response = await SendAndReceiveAsync<MqttSubAckPacket>(subscribePacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  96. if (response.SubscribeReturnCodes.Count != subscribePacket.TopicFilters.Count)
  97. {
  98. throw new MqttProtocolViolationException("The return codes are not matching the topic filters [MQTT-3.9.3-1].");
  99. }
  100. return subscribePacket.TopicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList();
  101. }
  102. public async Task UnsubscribeAsync(IEnumerable<string> topicFilters)
  103. {
  104. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  105. ThrowIfNotConnected();
  106. var unsubscribePacket = new MqttUnsubscribePacket
  107. {
  108. PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(),
  109. TopicFilters = topicFilters.ToList()
  110. };
  111. await SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  112. }
  113. public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
  114. {
  115. ThrowIfNotConnected();
  116. var publishPackets = applicationMessages.Select(m => m.ToPublishPacket());
  117. var packetGroups = publishPackets.GroupBy(p => p.QualityOfServiceLevel).OrderBy(g => g.Key);
  118. foreach (var qosGroup in packetGroups)
  119. {
  120. switch (qosGroup.Key)
  121. {
  122. case MqttQualityOfServiceLevel.AtMostOnce:
  123. {
  124. // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
  125. await SendAsync(qosGroup, _cancellationTokenSource.Token).ConfigureAwait(false);
  126. break;
  127. }
  128. case MqttQualityOfServiceLevel.AtLeastOnce:
  129. {
  130. foreach (var publishPacket in qosGroup)
  131. {
  132. publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
  133. await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  134. }
  135. break;
  136. }
  137. case MqttQualityOfServiceLevel.ExactlyOnce:
  138. {
  139. foreach (var publishPacket in qosGroup)
  140. {
  141. publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
  142. var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  143. var pubRelPacket = new MqttPubRelPacket
  144. {
  145. PacketIdentifier = pubRecPacket.PacketIdentifier
  146. };
  147. await SendAndReceiveAsync<MqttPubCompPacket>(pubRelPacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  148. }
  149. break;
  150. }
  151. default:
  152. {
  153. throw new InvalidOperationException();
  154. }
  155. }
  156. }
  157. }
  158. public void Dispose()
  159. {
  160. _cancellationTokenSource?.Dispose();
  161. _cancellationTokenSource = null;
  162. _adapter?.Dispose();
  163. }
  164. private async Task<MqttConnAckPacket> AuthenticateAsync(MqttApplicationMessage willApplicationMessage, CancellationToken cancellationToken)
  165. {
  166. var connectPacket = new MqttConnectPacket
  167. {
  168. ClientId = _options.ClientId,
  169. Username = _options.Credentials?.Username,
  170. Password = _options.Credentials?.Password,
  171. CleanSession = _options.CleanSession,
  172. KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
  173. WillMessage = willApplicationMessage
  174. };
  175. var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket, cancellationToken).ConfigureAwait(false);
  176. if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
  177. {
  178. throw new MqttConnectingFailedException(response.ConnectReturnCode);
  179. }
  180. return response;
  181. }
  182. private void ThrowIfNotConnected()
  183. {
  184. if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
  185. }
  186. private void ThrowIfConnected(string message)
  187. {
  188. if (IsConnected) throw new MqttProtocolViolationException(message);
  189. }
  190. private async Task DisconnectInternalAsync(Task sender, Exception exception)
  191. {
  192. await _disconnectLock.WaitAsync();
  193. try
  194. {
  195. if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
  196. {
  197. return;
  198. }
  199. _cancellationTokenSource.Cancel(false);
  200. }
  201. catch (Exception adapterException)
  202. {
  203. _logger.Warning<MqttClient>(adapterException, "Error while disconnecting from adapter.");
  204. }
  205. finally
  206. {
  207. _disconnectLock.Release();
  208. }
  209. var clientWasConnected = IsConnected;
  210. IsConnected = false;
  211. try
  212. {
  213. await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false);
  214. await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false);
  215. if (_keepAliveMessageSenderTask != null && _keepAliveMessageSenderTask != sender)
  216. {
  217. await _keepAliveMessageSenderTask.ConfigureAwait(false);
  218. }
  219. if (_adapter != null)
  220. {
  221. await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
  222. }
  223. _logger.Verbose<MqttClient>("Disconnected from adapter.");
  224. }
  225. catch (Exception adapterException)
  226. {
  227. _logger.Warning<MqttClient>(adapterException, "Error while disconnecting from adapter.");
  228. }
  229. finally
  230. {
  231. _adapter?.Dispose();
  232. _adapter = null;
  233. _cancellationTokenSource?.Dispose();
  234. _cancellationTokenSource = null;
  235. _logger.Info<MqttClient>("Disconnected.");
  236. Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
  237. }
  238. }
  239. private Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken)
  240. {
  241. return SendAsync(new[] { packet }, cancellationToken);
  242. }
  243. private Task SendAsync(IEnumerable<MqttBasePacket> packets, CancellationToken cancellationToken)
  244. {
  245. if (cancellationToken.IsCancellationRequested)
  246. {
  247. throw new TaskCanceledException();
  248. }
  249. _sendTracker.Restart();
  250. return _adapter.SendPacketsAsync(_options.CommunicationTimeout, packets, cancellationToken);
  251. }
  252. private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttBasePacket
  253. {
  254. if (cancellationToken.IsCancellationRequested)
  255. {
  256. throw new TaskCanceledException();
  257. }
  258. _sendTracker.Restart();
  259. ushort identifier = 0;
  260. if (requestPacket is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue)
  261. {
  262. identifier = packetWithIdentifier.PacketIdentifier.Value;
  263. }
  264. var packetAwaiter = _packetDispatcher.AddPacketAwaiter<TResponsePacket>(identifier);
  265. try
  266. {
  267. await _adapter.SendPacketsAsync(_options.CommunicationTimeout, new[] { requestPacket }, cancellationToken).ConfigureAwait(false);
  268. var respone = await Internal.TaskExtensions.TimeoutAfter(ct => packetAwaiter.Task, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
  269. return (TResponsePacket)respone;
  270. }
  271. catch (MqttCommunicationTimedOutException)
  272. {
  273. _logger.Warning<MqttPacketDispatcher>($"Timeout while waiting for packet of type '{typeof(TResponsePacket).Namespace}'.");
  274. throw;
  275. }
  276. finally
  277. {
  278. _packetDispatcher.RemovePacketAwaiter<TResponsePacket>(identifier);
  279. }
  280. }
  281. private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken)
  282. {
  283. _logger.Verbose<MqttClient>("Start sending keep alive packets.");
  284. try
  285. {
  286. while (!cancellationToken.IsCancellationRequested)
  287. {
  288. var keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75);
  289. if (_options.KeepAliveSendInterval.HasValue)
  290. {
  291. keepAliveSendInterval = _options.KeepAliveSendInterval.Value;
  292. }
  293. if (_sendTracker.Elapsed > keepAliveSendInterval)
  294. {
  295. await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket(), cancellationToken).ConfigureAwait(false);
  296. }
  297. await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false);
  298. }
  299. }
  300. catch (Exception exception)
  301. {
  302. if (exception is OperationCanceledException)
  303. {
  304. }
  305. else if (exception is MqttCommunicationException)
  306. {
  307. _logger.Warning<MqttClient>(exception, "MQTT communication exception while sending/receiving keep alive packets.");
  308. }
  309. else
  310. {
  311. _logger.Error<MqttClient>(exception, "Unhandled exception while sending/receiving keep alive packets.");
  312. }
  313. await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false);
  314. }
  315. finally
  316. {
  317. _logger.Verbose<MqttClient>("Stopped sending keep alive packets.");
  318. }
  319. }
  320. private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
  321. {
  322. _logger.Verbose<MqttClient>("Start receiving packets.");
  323. try
  324. {
  325. while (!cancellationToken.IsCancellationRequested)
  326. {
  327. var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
  328. if (cancellationToken.IsCancellationRequested)
  329. {
  330. return;
  331. }
  332. if (packet == null)
  333. {
  334. continue;
  335. }
  336. if (_options.ReceivedApplicationMessageProcessingMode == MqttReceivedApplicationMessageProcessingMode.SingleThread)
  337. {
  338. await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false);
  339. }
  340. else if (_options.ReceivedApplicationMessageProcessingMode == MqttReceivedApplicationMessageProcessingMode.DedicatedThread)
  341. {
  342. StartProcessReceivedPacketAsync(packet, cancellationToken);
  343. }
  344. }
  345. }
  346. catch (Exception exception)
  347. {
  348. if (exception is OperationCanceledException)
  349. {
  350. }
  351. else if (exception is MqttCommunicationException)
  352. {
  353. _logger.Warning<MqttClient>(exception, "MQTT communication exception while receiving packets.");
  354. }
  355. else
  356. {
  357. _logger.Error<MqttClient>(exception, "Unhandled exception while receiving packets.");
  358. }
  359. await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false);
  360. _packetDispatcher.Dispatch(exception);
  361. }
  362. finally
  363. {
  364. _logger.Verbose<MqttClient>("Stopped receiving packets.");
  365. }
  366. }
  367. private async Task ProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
  368. {
  369. try
  370. {
  371. if (packet is MqttPublishPacket publishPacket)
  372. {
  373. await ProcessReceivedPublishPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false);
  374. return;
  375. }
  376. if (packet is MqttPingReqPacket)
  377. {
  378. await SendAsync(new MqttPingRespPacket(), cancellationToken).ConfigureAwait(false);
  379. return;
  380. }
  381. if (packet is MqttDisconnectPacket)
  382. {
  383. await DisconnectAsync().ConfigureAwait(false);
  384. return;
  385. }
  386. if (packet is MqttPubRelPacket pubRelPacket)
  387. {
  388. await ProcessReceivedPubRelPacket(pubRelPacket, cancellationToken).ConfigureAwait(false);
  389. return;
  390. }
  391. _packetDispatcher.Dispatch(packet);
  392. }
  393. catch (Exception exception)
  394. {
  395. _logger.Error<MqttClient>(exception, "Unhandled exception while processing received packet.");
  396. }
  397. }
  398. private Task ProcessReceivedPublishPacketAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
  399. {
  400. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  401. {
  402. FireApplicationMessageReceivedEvent(publishPacket);
  403. return Task.FromResult(0);
  404. }
  405. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
  406. {
  407. FireApplicationMessageReceivedEvent(publishPacket);
  408. return SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, cancellationToken);
  409. }
  410. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
  411. {
  412. // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
  413. FireApplicationMessageReceivedEvent(publishPacket);
  414. return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, cancellationToken);
  415. }
  416. throw new MqttCommunicationException("Received a not supported QoS level.");
  417. }
  418. private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken)
  419. {
  420. var response = new MqttPubCompPacket
  421. {
  422. PacketIdentifier = pubRelPacket.PacketIdentifier
  423. };
  424. return SendAsync(response, cancellationToken);
  425. }
  426. private void StartReceivingPackets(CancellationToken cancellationToken)
  427. {
  428. _packetReceiverTask = Task.Run(() => ReceivePacketsAsync(cancellationToken), cancellationToken);
  429. }
  430. private void StartSendingKeepAliveMessages(CancellationToken cancellationToken)
  431. {
  432. _keepAliveMessageSenderTask = Task.Run(() => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken);
  433. }
  434. private void StartProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
  435. {
  436. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  437. Task.Run(() => ProcessReceivedPacketAsync(packet, cancellationToken), cancellationToken);
  438. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  439. }
  440. private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
  441. {
  442. try
  443. {
  444. var applicationMessage = publishPacket.ToApplicationMessage();
  445. ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(_options.ClientId, applicationMessage));
  446. }
  447. catch (Exception exception)
  448. {
  449. _logger.Error<MqttClient>(exception, "Unhandled exception while handling application message.");
  450. }
  451. }
  452. private static async Task WaitForTaskAsync(Task task, Task sender)
  453. {
  454. if (task == sender || task == null)
  455. {
  456. return;
  457. }
  458. if (task.IsCanceled || task.IsCompleted || task.IsFaulted)
  459. {
  460. return;
  461. }
  462. try
  463. {
  464. await task.ConfigureAwait(false);
  465. }
  466. catch (TaskCanceledException)
  467. {
  468. }
  469. }
  470. }
  471. }