You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

436 lines
17 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Core.Adapter;
  7. using MQTTnet.Core.Diagnostics;
  8. using MQTTnet.Core.Exceptions;
  9. using MQTTnet.Core.Internal;
  10. using MQTTnet.Core.Packets;
  11. using MQTTnet.Core.Protocol;
  12. namespace MQTTnet.Core.Client
  13. {
  14. public class MqttClient : IMqttClient
  15. {
  16. private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>();
  17. private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
  18. private readonly MqttClientOptions _options;
  19. private readonly IMqttCommunicationAdapter _adapter;
  20. private int _latestPacketIdentifier;
  21. private CancellationTokenSource _cancellationTokenSource;
  22. public MqttClient(MqttClientOptions options, IMqttCommunicationAdapter adapter)
  23. {
  24. _options = options ?? throw new ArgumentNullException(nameof(options));
  25. _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
  26. _adapter.PacketSerializer.ProtocolVersion = options.ProtocolVersion;
  27. }
  28. public event EventHandler Connected;
  29. public event EventHandler Disconnected;
  30. public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
  31. public bool IsConnected => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested;
  32. public async Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null)
  33. {
  34. ThrowIfConnected("It is not allowed to connect with a server after the connection is established.");
  35. try
  36. {
  37. _cancellationTokenSource = new CancellationTokenSource();
  38. _latestPacketIdentifier = 0;
  39. _packetDispatcher.Reset();
  40. MqttTrace.Verbose(nameof(MqttClient), "Trying to connect with server.");
  41. await _adapter.ConnectAsync(_options.DefaultCommunicationTimeout, _options).ConfigureAwait(false);
  42. MqttTrace.Verbose(nameof(MqttClient), "Connection with server established.");
  43. StartReceivePackets(_cancellationTokenSource.Token);
  44. var connectPacket = new MqttConnectPacket
  45. {
  46. ClientId = _options.ClientId,
  47. Username = _options.UserName,
  48. Password = _options.Password,
  49. CleanSession = _options.CleanSession,
  50. KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
  51. WillMessage = willApplicationMessage
  52. };
  53. var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket).ConfigureAwait(false);
  54. if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
  55. {
  56. throw new MqttConnectingFailedException(response.ConnectReturnCode);
  57. }
  58. MqttTrace.Verbose(nameof(MqttClient), "MQTT connection with server established.");
  59. if (_options.KeepAlivePeriod != TimeSpan.Zero)
  60. {
  61. StartSendKeepAliveMessages(_cancellationTokenSource.Token);
  62. }
  63. Connected?.Invoke(this, EventArgs.Empty);
  64. }
  65. catch (Exception)
  66. {
  67. await DisconnectInternalAsync().ConfigureAwait(false);
  68. throw;
  69. }
  70. }
  71. public async Task DisconnectAsync()
  72. {
  73. if (!IsConnected)
  74. {
  75. return;
  76. }
  77. try
  78. {
  79. await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false);
  80. }
  81. finally
  82. {
  83. await DisconnectInternalAsync().ConfigureAwait(false);
  84. }
  85. }
  86. public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
  87. {
  88. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  89. ThrowIfNotConnected();
  90. var subscribePacket = new MqttSubscribePacket
  91. {
  92. PacketIdentifier = GetNewPacketIdentifier(),
  93. TopicFilters = topicFilters.ToList()
  94. };
  95. var response = await SendAndReceiveAsync<MqttSubAckPacket>(subscribePacket).ConfigureAwait(false);
  96. if (response.SubscribeReturnCodes.Count != subscribePacket.TopicFilters.Count)
  97. {
  98. throw new MqttProtocolViolationException("The return codes are not matching the topic filters [MQTT-3.9.3-1].");
  99. }
  100. return subscribePacket.TopicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList();
  101. }
  102. public async Task UnsubscribeAsync(IEnumerable<string> topicFilters)
  103. {
  104. if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
  105. ThrowIfNotConnected();
  106. var unsubscribePacket = new MqttUnsubscribePacket
  107. {
  108. PacketIdentifier = GetNewPacketIdentifier(),
  109. TopicFilters = topicFilters.ToList()
  110. };
  111. await SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket);
  112. }
  113. public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
  114. {
  115. ThrowIfNotConnected();
  116. var publishPackets = applicationMessages.Select(m => m.ToPublishPacket());
  117. foreach (var qosGroup in publishPackets.GroupBy(p => p.QualityOfServiceLevel))
  118. {
  119. var qosPackets = qosGroup.ToArray();
  120. switch (qosGroup.Key)
  121. {
  122. case MqttQualityOfServiceLevel.AtMostOnce:
  123. {
  124. // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
  125. await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, qosPackets);
  126. break;
  127. }
  128. case MqttQualityOfServiceLevel.AtLeastOnce:
  129. {
  130. foreach (var publishPacket in qosPackets)
  131. {
  132. publishPacket.PacketIdentifier = GetNewPacketIdentifier();
  133. await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket);
  134. }
  135. break;
  136. }
  137. case MqttQualityOfServiceLevel.ExactlyOnce:
  138. {
  139. foreach (var publishPacket in qosPackets)
  140. {
  141. publishPacket.PacketIdentifier = GetNewPacketIdentifier();
  142. var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket).ConfigureAwait(false);
  143. await SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>()).ConfigureAwait(false);
  144. }
  145. break;
  146. }
  147. default:
  148. {
  149. throw new InvalidOperationException();
  150. }
  151. }
  152. }
  153. }
  154. private void ThrowIfNotConnected()
  155. {
  156. if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
  157. }
  158. private void ThrowIfConnected(string message)
  159. {
  160. if (IsConnected) throw new MqttProtocolViolationException(message);
  161. }
  162. private async Task DisconnectInternalAsync()
  163. {
  164. var cts = _cancellationTokenSource;
  165. if (cts == null || cts.IsCancellationRequested)
  166. {
  167. return;
  168. }
  169. cts.Cancel(false);
  170. cts.Dispose();
  171. _cancellationTokenSource = null;
  172. try
  173. {
  174. await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
  175. MqttTrace.Information(nameof(MqttClient), "Disconnected from adapter.");
  176. }
  177. catch (Exception exception)
  178. {
  179. MqttTrace.Warning(nameof(MqttClient), exception, "Error while disconnecting from adapter.");
  180. }
  181. finally
  182. {
  183. Disconnected?.Invoke(this, EventArgs.Empty);
  184. }
  185. }
  186. private async Task ProcessReceivedPacketAsync(MqttBasePacket packet)
  187. {
  188. try
  189. {
  190. MqttTrace.Information(nameof(MqttClient), "Received <<< {0}", packet);
  191. if (packet is MqttPingReqPacket)
  192. {
  193. await SendAsync(new MqttPingRespPacket());
  194. return;
  195. }
  196. if (packet is MqttDisconnectPacket)
  197. {
  198. await DisconnectAsync();
  199. return;
  200. }
  201. if (packet is MqttPublishPacket publishPacket)
  202. {
  203. await ProcessReceivedPublishPacket(publishPacket);
  204. return;
  205. }
  206. if (packet is MqttPubRelPacket pubRelPacket)
  207. {
  208. await ProcessReceivedPubRelPacket(pubRelPacket);
  209. return;
  210. }
  211. _packetDispatcher.Dispatch(packet);
  212. }
  213. catch (Exception exception)
  214. {
  215. MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while processing received packet.");
  216. }
  217. }
  218. private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
  219. {
  220. try
  221. {
  222. var applicationMessage = publishPacket.ToApplicationMessage();
  223. ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage));
  224. }
  225. catch (Exception exception)
  226. {
  227. MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message.");
  228. }
  229. }
  230. private async Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket)
  231. {
  232. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  233. {
  234. FireApplicationMessageReceivedEvent(publishPacket);
  235. return;
  236. }
  237. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
  238. {
  239. FireApplicationMessageReceivedEvent(publishPacket);
  240. await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
  241. return;
  242. }
  243. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
  244. {
  245. // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
  246. lock (_unacknowledgedPublishPackets)
  247. {
  248. _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier);
  249. }
  250. FireApplicationMessageReceivedEvent(publishPacket);
  251. await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
  252. return;
  253. }
  254. throw new MqttCommunicationException("Received a not supported QoS level.");
  255. }
  256. private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
  257. {
  258. lock (_unacknowledgedPublishPackets)
  259. {
  260. _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
  261. }
  262. return SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>());
  263. }
  264. private Task SendAsync(MqttBasePacket packet)
  265. {
  266. return _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, packet);
  267. }
  268. private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
  269. {
  270. var wait = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout);
  271. await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false);
  272. return (TResponsePacket)await wait.ConfigureAwait(false);
  273. }
  274. private ushort GetNewPacketIdentifier()
  275. {
  276. return (ushort)Interlocked.Increment(ref _latestPacketIdentifier);
  277. }
  278. private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken)
  279. {
  280. MqttTrace.Information(nameof(MqttClient), "Start sending keep alive packets.");
  281. try
  282. {
  283. while (!cancellationToken.IsCancellationRequested)
  284. {
  285. await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false);
  286. if (cancellationToken.IsCancellationRequested)
  287. {
  288. return;
  289. }
  290. await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()).ConfigureAwait(false);
  291. }
  292. }
  293. catch (TaskCanceledException)
  294. {
  295. }
  296. catch (MqttCommunicationException exception)
  297. {
  298. if (cancellationToken.IsCancellationRequested)
  299. {
  300. return;
  301. }
  302. MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending/receiving keep alive packets.");
  303. await DisconnectInternalAsync().ConfigureAwait(false);
  304. }
  305. catch (Exception exception)
  306. {
  307. MqttTrace.Warning(nameof(MqttClient), exception, "Unhandled exception while sending/receiving keep alive packets.");
  308. await DisconnectInternalAsync().ConfigureAwait(false);
  309. }
  310. finally
  311. {
  312. MqttTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets.");
  313. }
  314. }
  315. private async Task ReceivePackets(CancellationToken cancellationToken)
  316. {
  317. MqttTrace.Information(nameof(MqttClient), "Start receiving packets.");
  318. try
  319. {
  320. while (!cancellationToken.IsCancellationRequested)
  321. {
  322. var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
  323. if (cancellationToken.IsCancellationRequested)
  324. {
  325. return;
  326. }
  327. StartProcessReceivedPacket(packet, cancellationToken);
  328. }
  329. }
  330. catch (TaskCanceledException)
  331. {
  332. }
  333. catch (MqttCommunicationException exception)
  334. {
  335. if (cancellationToken.IsCancellationRequested)
  336. {
  337. return;
  338. }
  339. MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets.");
  340. await DisconnectInternalAsync().ConfigureAwait(false);
  341. }
  342. catch (Exception exception)
  343. {
  344. MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets.");
  345. await DisconnectInternalAsync().ConfigureAwait(false);
  346. }
  347. finally
  348. {
  349. MqttTrace.Information(nameof(MqttClient), "Stopped receiving packets.");
  350. }
  351. }
  352. private void StartProcessReceivedPacket(MqttBasePacket packet, CancellationToken cancellationToken)
  353. {
  354. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  355. Task.Run(() => ProcessReceivedPacketAsync(packet), cancellationToken).ConfigureAwait(false);
  356. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  357. }
  358. private void StartReceivePackets(CancellationToken cancellationToken)
  359. {
  360. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  361. Task.Run(() => ReceivePackets(cancellationToken), cancellationToken).ConfigureAwait(false); ;
  362. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  363. }
  364. private void StartSendKeepAliveMessages(CancellationToken cancellationToken)
  365. {
  366. #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  367. Task.Run(() => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken).ConfigureAwait(false);
  368. #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
  369. }
  370. }
  371. }