You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

560 regels
22 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MQTTnet.Adapter;
  8. using MQTTnet.Diagnostics;
  9. using MQTTnet.Exceptions;
  10. using MQTTnet.Internal;
  11. using MQTTnet.Packets;
  12. using MQTTnet.Protocol;
  13. namespace MQTTnet.Client
  14. {
  15. public class MqttClient : IMqttClient
  16. {
  17. private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
  18. private readonly Stopwatch _sendTracker = new Stopwatch();
  19. private readonly SemaphoreSlim _disconnectLock = new SemaphoreSlim(1, 1);
  20. private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
  21. private readonly IMqttClientAdapterFactory _adapterFactory;
  22. private readonly IMqttNetChildLogger _logger;
  23. private IMqttClientOptions _options;
  24. private CancellationTokenSource _cancellationTokenSource;
  25. private Task _packetReceiverTask;
  26. private Task _keepAliveMessageSenderTask;
  27. private IMqttChannelAdapter _adapter;
  28. public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger)
  29. {
  30. if (logger == null) throw new ArgumentNullException(nameof(logger));
  31. _adapterFactory = channelFactory ?? throw new ArgumentNullException(nameof(channelFactory));
  32. _logger = logger.CreateChildLogger(nameof(MqttClient));
  33. }
  34. public event EventHandler<MqttClientConnectedEventArgs> Connected;
  35. public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
  36. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  37. public bool IsConnected { get; private set; }
  38. public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions options)
  39. {
  40. if (options == null) throw new ArgumentNullException(nameof(options));
  41. if (options.ChannelOptions == null) throw new ArgumentException("ChannelOptions are not set.");
  42. ThrowIfConnected("It is not allowed to connect with a server after the connection is established.");
  43. try
  44. {
  45. _cancellationTokenSource = new CancellationTokenSource();
  46. _options = options;
  47. _packetIdentifierProvider.Reset();
  48. _packetDispatcher.Reset();
  49. _adapter = _adapterFactory.CreateClientAdapter(options, _logger);
  50. _logger.Verbose("Trying to connect with server.");
  51. await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
  52. _logger.Verbose("Connection with server established.");
  53. StartReceivingPackets(_cancellationTokenSource.Token);
  54. var connectResponse = await AuthenticateAsync(options.WillMessage, _cancellationTokenSource.Token).ConfigureAwait(false);
  55. _logger.Verbose("MQTT connection with server established.");
  56. _sendTracker.Restart();
  57. if (_options.KeepAlivePeriod != TimeSpan.Zero)
  58. {
  59. StartSendingKeepAliveMessages(_cancellationTokenSource.Token);
  60. }
  61. IsConnected = true;
  62. Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent));
  63. _logger.Info("Connected.");
  64. return new MqttClientConnectResult(connectResponse.IsSessionPresent);
  65. }
  66. catch (Exception exception)
  67. {
  68. _logger.Error(exception, "Error while connecting with server.");
  69. await DisconnectInternalAsync(null, exception).ConfigureAwait(false);
  70. throw;
  71. }
  72. }
  73. public async Task DisconnectAsync()
  74. {
  75. try
  76. {
  77. if (IsConnected && !_cancellationTokenSource.IsCancellationRequested)
  78. {
  79. await SendAsync(new MqttDisconnectPacket(), _cancellationTokenSource.Token).ConfigureAwait(false);
  80. }
  81. }
  82. finally
  83. {
  84. await DisconnectInternalAsync(null, null).ConfigureAwait(false);
  85. }
  86. }
  87. public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  88. {
  89. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  90. ThrowIfNotConnected();
  91. var subscribePacket = new MqttSubscribePacket
  92. {
  93. PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(),
  94. TopicFilters = topicFilters.ToList()
  95. };
  96. var response = await SendAndReceiveAsync<MqttSubAckPacket>(subscribePacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  97. if (response.SubscribeReturnCodes.Count != subscribePacket.TopicFilters.Count)
  98. {
  99. throw new MqttProtocolViolationException("The return codes are not matching the topic filters [MQTT-3.9.3-1].");
  100. }
  101. return subscribePacket.TopicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList();
  102. }
  103. public async Task UnsubscribeAsync(IEnumerable<string> topicFilters)
  104. {
  105. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  106. ThrowIfNotConnected();
  107. var unsubscribePacket = new MqttUnsubscribePacket
  108. {
  109. PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier(),
  110. TopicFilters = topicFilters.ToList()
  111. };
  112. await SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  113. }
  114. public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
  115. {
  116. ThrowIfNotConnected();
  117. var publishPackets = applicationMessages.Select(m => m.ToPublishPacket());
  118. var packetGroups = publishPackets.GroupBy(p => p.QualityOfServiceLevel).OrderBy(g => g.Key);
  119. foreach (var qosGroup in packetGroups)
  120. {
  121. switch (qosGroup.Key)
  122. {
  123. case MqttQualityOfServiceLevel.AtMostOnce:
  124. {
  125. // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
  126. await SendAsync(qosGroup, _cancellationTokenSource.Token).ConfigureAwait(false);
  127. break;
  128. }
  129. case MqttQualityOfServiceLevel.AtLeastOnce:
  130. {
  131. foreach (var publishPacket in qosGroup)
  132. {
  133. publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
  134. await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  135. }
  136. break;
  137. }
  138. case MqttQualityOfServiceLevel.ExactlyOnce:
  139. {
  140. foreach (var publishPacket in qosGroup)
  141. {
  142. publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
  143. var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  144. var pubRelPacket = new MqttPubRelPacket
  145. {
  146. PacketIdentifier = pubRecPacket.PacketIdentifier
  147. };
  148. await SendAndReceiveAsync<MqttPubCompPacket>(pubRelPacket, _cancellationTokenSource.Token).ConfigureAwait(false);
  149. }
  150. break;
  151. }
  152. default:
  153. {
  154. throw new InvalidOperationException();
  155. }
  156. }
  157. }
  158. }
  159. public void Dispose()
  160. {
  161. _cancellationTokenSource?.Dispose();
  162. _cancellationTokenSource = null;
  163. _adapter?.Dispose();
  164. }
  165. private async Task<MqttConnAckPacket> AuthenticateAsync(MqttApplicationMessage willApplicationMessage, CancellationToken cancellationToken)
  166. {
  167. var connectPacket = new MqttConnectPacket
  168. {
  169. ClientId = _options.ClientId,
  170. Username = _options.Credentials?.Username,
  171. Password = _options.Credentials?.Password,
  172. CleanSession = _options.CleanSession,
  173. KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
  174. WillMessage = willApplicationMessage
  175. };
  176. var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket, cancellationToken).ConfigureAwait(false);
  177. if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
  178. {
  179. throw new MqttConnectingFailedException(response.ConnectReturnCode);
  180. }
  181. return response;
  182. }
  183. private void ThrowIfNotConnected()
  184. {
  185. if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
  186. }
  187. private void ThrowIfConnected(string message)
  188. {
  189. if (IsConnected) throw new MqttProtocolViolationException(message);
  190. }
  191. private async Task DisconnectInternalAsync(Task sender, Exception exception)
  192. {
  193. await _disconnectLock.WaitAsync().ConfigureAwait(false);
  194. try
  195. {
  196. if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
  197. {
  198. return;
  199. }
  200. _cancellationTokenSource.Cancel(false);
  201. }
  202. catch (Exception adapterException)
  203. {
  204. _logger.Warning(adapterException, "Error while disconnecting from adapter.");
  205. }
  206. finally
  207. {
  208. _disconnectLock.Release();
  209. }
  210. var clientWasConnected = IsConnected;
  211. IsConnected = false;
  212. try
  213. {
  214. await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false);
  215. await WaitForTaskAsync(_keepAliveMessageSenderTask, sender).ConfigureAwait(false);
  216. if (_adapter != null)
  217. {
  218. await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
  219. }
  220. _logger.Verbose("Disconnected from adapter.");
  221. }
  222. catch (Exception adapterException)
  223. {
  224. _logger.Warning(adapterException, "Error while disconnecting from adapter.");
  225. }
  226. finally
  227. {
  228. _adapter?.Dispose();
  229. _adapter = null;
  230. _cancellationTokenSource?.Dispose();
  231. _cancellationTokenSource = null;
  232. _logger.Info("Disconnected.");
  233. Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
  234. }
  235. }
  236. private Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken)
  237. {
  238. return SendAsync(new[] { packet }, cancellationToken);
  239. }
  240. private Task SendAsync(IEnumerable<MqttBasePacket> packets, CancellationToken cancellationToken)
  241. {
  242. if (cancellationToken.IsCancellationRequested)
  243. {
  244. throw new TaskCanceledException();
  245. }
  246. _sendTracker.Restart();
  247. return _adapter.SendPacketsAsync(_options.CommunicationTimeout, packets, cancellationToken);
  248. }
  249. private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttBasePacket
  250. {
  251. if (cancellationToken.IsCancellationRequested)
  252. {
  253. throw new TaskCanceledException();
  254. }
  255. _sendTracker.Restart();
  256. ushort identifier = 0;
  257. if (requestPacket is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue)
  258. {
  259. identifier = packetWithIdentifier.PacketIdentifier.Value;
  260. }
  261. var packetAwaiter = _packetDispatcher.AddPacketAwaiter<TResponsePacket>(identifier);
  262. try
  263. {
  264. await _adapter.SendPacketsAsync(_options.CommunicationTimeout, new[] { requestPacket }, cancellationToken).ConfigureAwait(false);
  265. var respone = await Internal.TaskExtensions.TimeoutAfter(ct => packetAwaiter.Task, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
  266. return (TResponsePacket)respone;
  267. }
  268. catch (MqttCommunicationTimedOutException)
  269. {
  270. _logger.Warning(null, "Timeout while waiting for packet of type '{0}'.", typeof(TResponsePacket).Namespace);
  271. throw;
  272. }
  273. finally
  274. {
  275. _packetDispatcher.RemovePacketAwaiter<TResponsePacket>(identifier);
  276. }
  277. }
  278. private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken)
  279. {
  280. _logger.Verbose("Start sending keep alive packets.");
  281. try
  282. {
  283. while (!cancellationToken.IsCancellationRequested)
  284. {
  285. var keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75);
  286. if (_options.KeepAliveSendInterval.HasValue)
  287. {
  288. keepAliveSendInterval = _options.KeepAliveSendInterval.Value;
  289. }
  290. if (_sendTracker.Elapsed > keepAliveSendInterval)
  291. {
  292. await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket(), cancellationToken).ConfigureAwait(false);
  293. }
  294. await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false);
  295. }
  296. }
  297. catch (Exception exception)
  298. {
  299. if (exception is OperationCanceledException)
  300. {
  301. }
  302. else if (exception is MqttCommunicationException)
  303. {
  304. _logger.Warning(exception, "MQTT communication exception while sending/receiving keep alive packets.");
  305. }
  306. else
  307. {
  308. _logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets.");
  309. }
  310. await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false);
  311. }
  312. finally
  313. {
  314. _logger.Verbose("Stopped sending keep alive packets.");
  315. }
  316. }
  317. private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
  318. {
  319. _logger.Verbose("Start receiving packets.");
  320. try
  321. {
  322. while (!cancellationToken.IsCancellationRequested)
  323. {
  324. var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
  325. if (cancellationToken.IsCancellationRequested)
  326. {
  327. return;
  328. }
  329. if (packet == null)
  330. {
  331. continue;
  332. }
  333. if (_options.ReceivedApplicationMessageProcessingMode == MqttReceivedApplicationMessageProcessingMode.SingleThread)
  334. {
  335. await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false);
  336. }
  337. else if (_options.ReceivedApplicationMessageProcessingMode == MqttReceivedApplicationMessageProcessingMode.DedicatedThread)
  338. {
  339. StartProcessReceivedPacketAsync(packet, cancellationToken);
  340. }
  341. }
  342. }
  343. catch (Exception exception)
  344. {
  345. if (exception is OperationCanceledException)
  346. {
  347. }
  348. else if (exception is MqttCommunicationException)
  349. {
  350. _logger.Warning(exception, "MQTT communication exception while receiving packets.");
  351. }
  352. else
  353. {
  354. _logger.Error(exception, "Unhandled exception while receiving packets.");
  355. }
  356. await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false);
  357. _packetDispatcher.Dispatch(exception);
  358. }
  359. finally
  360. {
  361. _logger.Verbose("Stopped receiving packets.");
  362. }
  363. }
  364. private async Task ProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
  365. {
  366. try
  367. {
  368. if (packet is MqttPublishPacket publishPacket)
  369. {
  370. await ProcessReceivedPublishPacketAsync(publishPacket, cancellationToken).ConfigureAwait(false);
  371. return;
  372. }
  373. if (packet is MqttPingReqPacket)
  374. {
  375. await SendAsync(new MqttPingRespPacket(), cancellationToken).ConfigureAwait(false);
  376. return;
  377. }
  378. if (packet is MqttDisconnectPacket)
  379. {
  380. await DisconnectAsync().ConfigureAwait(false);
  381. return;
  382. }
  383. if (packet is MqttPubRelPacket pubRelPacket)
  384. {
  385. await ProcessReceivedPubRelPacket(pubRelPacket, cancellationToken).ConfigureAwait(false);
  386. return;
  387. }
  388. _packetDispatcher.Dispatch(packet);
  389. }
  390. catch (Exception exception)
  391. {
  392. _logger.Error(exception, "Unhandled exception while processing received packet.");
  393. }
  394. }
  395. private Task ProcessReceivedPublishPacketAsync(MqttPublishPacket publishPacket, CancellationToken cancellationToken)
  396. {
  397. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  398. {
  399. FireApplicationMessageReceivedEvent(publishPacket);
  400. return Task.FromResult(0);
  401. }
  402. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
  403. {
  404. FireApplicationMessageReceivedEvent(publishPacket);
  405. return SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, cancellationToken);
  406. }
  407. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
  408. {
  409. // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
  410. FireApplicationMessageReceivedEvent(publishPacket);
  411. return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, cancellationToken);
  412. }
  413. throw new MqttCommunicationException("Received a not supported QoS level.");
  414. }
  415. private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken)
  416. {
  417. var response = new MqttPubCompPacket
  418. {
  419. PacketIdentifier = pubRelPacket.PacketIdentifier
  420. };
  421. return SendAsync(response, cancellationToken);
  422. }
  423. private void StartReceivingPackets(CancellationToken cancellationToken)
  424. {
  425. _packetReceiverTask = Task.Run(
  426. () => ReceivePacketsAsync(cancellationToken),
  427. cancellationToken);
  428. }
  429. private void StartSendingKeepAliveMessages(CancellationToken cancellationToken)
  430. {
  431. _keepAliveMessageSenderTask = Task.Run(
  432. () => SendKeepAliveMessagesAsync(cancellationToken),
  433. cancellationToken);
  434. }
  435. private void StartProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
  436. {
  437. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  438. Task.Run(() => ProcessReceivedPacketAsync(packet, cancellationToken), cancellationToken);
  439. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  440. }
  441. private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
  442. {
  443. try
  444. {
  445. var applicationMessage = publishPacket.ToApplicationMessage();
  446. ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(_options.ClientId, applicationMessage));
  447. }
  448. catch (Exception exception)
  449. {
  450. _logger.Error(exception, "Unhandled exception while handling application message.");
  451. }
  452. }
  453. private static async Task WaitForTaskAsync(Task task, Task sender)
  454. {
  455. if (task == sender || task == null)
  456. {
  457. return;
  458. }
  459. if (task.IsCanceled || task.IsCompleted || task.IsFaulted)
  460. {
  461. return;
  462. }
  463. try
  464. {
  465. await task.ConfigureAwait(false);
  466. }
  467. catch (TaskCanceledException)
  468. {
  469. }
  470. }
  471. }
  472. }