You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

348 rivejä
15 KiB

  1. using System;
  2. using System.Diagnostics;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Adapter;
  6. using MQTTnet.Diagnostics;
  7. using MQTTnet.Exceptions;
  8. using MQTTnet.Internal;
  9. using MQTTnet.Packets;
  10. using MQTTnet.Protocol;
  11. using MQTTnet.Serializer;
  12. namespace MQTTnet.Server
  13. {
  14. public sealed class MqttClientSession : IDisposable
  15. {
  16. private readonly Stopwatch _lastPacketReceivedTracker = Stopwatch.StartNew();
  17. private readonly Stopwatch _lastNonKeepAlivePacketReceivedTracker = Stopwatch.StartNew();
  18. private readonly IMqttServerOptions _options;
  19. private readonly IMqttNetLogger _logger;
  20. private readonly MqttClientSessionsManager _sessionsManager;
  21. private readonly MqttRetainedMessagesManager _retainedMessagesManager;
  22. private readonly MqttClientSubscriptionsManager _subscriptionsManager;
  23. private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;
  24. private IMqttChannelAdapter _adapter;
  25. private CancellationTokenSource _cancellationTokenSource;
  26. private MqttApplicationMessage _willMessage;
  27. public MqttClientSession(
  28. string clientId,
  29. IMqttServerOptions options,
  30. MqttRetainedMessagesManager retainedMessagesManager,
  31. MqttClientSessionsManager sessionsManager,
  32. IMqttNetLogger logger)
  33. {
  34. _options = options ?? throw new ArgumentNullException(nameof(options));
  35. _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException(nameof(retainedMessagesManager));
  36. _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
  37. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  38. ClientId = clientId;
  39. _subscriptionsManager = new MqttClientSubscriptionsManager(_options);
  40. _pendingMessagesQueue = new MqttClientPendingMessagesQueue(_options, this, _logger);
  41. }
  42. public string ClientId { get; }
  43. public MqttProtocolVersion? ProtocolVersion => _adapter?.PacketSerializer.ProtocolVersion;
  44. public TimeSpan LastPacketReceived => _lastPacketReceivedTracker.Elapsed;
  45. public TimeSpan LastNonKeepAlivePacketReceived => _lastNonKeepAlivePacketReceivedTracker.Elapsed;
  46. public bool IsConnected => _adapter != null;
  47. public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter)
  48. {
  49. if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket));
  50. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  51. try
  52. {
  53. var cancellationTokenSource = new CancellationTokenSource();
  54. _willMessage = connectPacket.WillMessage;
  55. _adapter = adapter;
  56. _cancellationTokenSource = cancellationTokenSource;
  57. _pendingMessagesQueue.Start(adapter, cancellationTokenSource.Token);
  58. _lastPacketReceivedTracker.Restart();
  59. _lastNonKeepAlivePacketReceivedTracker.Restart();
  60. if (connectPacket.KeepAlivePeriod > 0)
  61. {
  62. StartCheckingKeepAliveTimeout(TimeSpan.FromSeconds(connectPacket.KeepAlivePeriod), cancellationTokenSource.Token);
  63. }
  64. await ReceivePacketsAsync(adapter, cancellationTokenSource.Token).ConfigureAwait(false);
  65. }
  66. catch (OperationCanceledException)
  67. {
  68. }
  69. catch (MqttCommunicationException exception)
  70. {
  71. _logger.Warning<MqttClientSession>(exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
  72. }
  73. catch (Exception exception)
  74. {
  75. _logger.Error<MqttClientSession>(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
  76. }
  77. }
  78. public async Task StopAsync()
  79. {
  80. try
  81. {
  82. if (_cancellationTokenSource == null)
  83. {
  84. return;
  85. }
  86. _cancellationTokenSource?.Cancel(false);
  87. _cancellationTokenSource?.Dispose();
  88. _cancellationTokenSource = null;
  89. if (_adapter != null)
  90. {
  91. await _adapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
  92. _adapter = null;
  93. }
  94. _logger.Info<MqttClientSession>("Client '{0}': Session stopped.", ClientId);
  95. }
  96. finally
  97. {
  98. var willMessage = _willMessage;
  99. if (willMessage != null)
  100. {
  101. _willMessage = null; //clear willmessage so it is send just once
  102. await _sessionsManager.DispatchApplicationMessageAsync(this, willMessage).ConfigureAwait(false);
  103. }
  104. }
  105. }
  106. public async Task EnqueueApplicationMessageAsync(MqttApplicationMessage applicationMessage)
  107. {
  108. if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
  109. var result = await _subscriptionsManager.CheckSubscriptionsAsync(applicationMessage);
  110. if (!result.IsSubscribed)
  111. {
  112. return;
  113. }
  114. var publishPacket = applicationMessage.ToPublishPacket();
  115. publishPacket.QualityOfServiceLevel = result.QualityOfServiceLevel;
  116. _pendingMessagesQueue.Enqueue(publishPacket);
  117. }
  118. public void Dispose()
  119. {
  120. _pendingMessagesQueue?.Dispose();
  121. _cancellationTokenSource?.Dispose();
  122. }
  123. private async Task ReceivePacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
  124. {
  125. try
  126. {
  127. while (!cancellationToken.IsCancellationRequested)
  128. {
  129. var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
  130. _lastPacketReceivedTracker.Restart();
  131. if (!(packet is MqttPingReqPacket))
  132. {
  133. _lastNonKeepAlivePacketReceivedTracker.Restart();
  134. }
  135. await ProcessReceivedPacketAsync(adapter, packet, cancellationToken).ConfigureAwait(false);
  136. }
  137. }
  138. catch (OperationCanceledException)
  139. {
  140. }
  141. catch (MqttCommunicationException exception)
  142. {
  143. _logger.Warning<MqttClientSession>(exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
  144. await StopAsync().ConfigureAwait(false);
  145. }
  146. catch (Exception exception)
  147. {
  148. _logger.Error<MqttClientSession>(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
  149. await StopAsync().ConfigureAwait(false);
  150. }
  151. }
  152. private Task ProcessReceivedPacketAsync(IMqttChannelAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken)
  153. {
  154. if (packet is MqttPublishPacket publishPacket)
  155. {
  156. return HandleIncomingPublishPacketAsync(adapter, publishPacket, cancellationToken);
  157. }
  158. if (packet is MqttPingReqPacket)
  159. {
  160. return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttPingRespPacket());
  161. }
  162. if (packet is MqttPubRelPacket pubRelPacket)
  163. {
  164. return HandleIncomingPubRelPacketAsync(adapter, pubRelPacket, cancellationToken);
  165. }
  166. if (packet is MqttPubRecPacket pubRecPacket)
  167. {
  168. return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, pubRecPacket.CreateResponse<MqttPubRelPacket>());
  169. }
  170. if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
  171. {
  172. // Discard message.
  173. return Task.FromResult(0);
  174. }
  175. if (packet is MqttSubscribePacket subscribePacket)
  176. {
  177. return HandleIncomingSubscribePacketAsync(adapter, subscribePacket, cancellationToken);
  178. }
  179. if (packet is MqttUnsubscribePacket unsubscribePacket)
  180. {
  181. return HandleIncomingUnsubscribePacketAsync(adapter, unsubscribePacket, cancellationToken);
  182. }
  183. if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
  184. {
  185. return StopAsync();
  186. }
  187. _logger.Warning<MqttClientSession>("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
  188. return StopAsync();
  189. }
  190. private async Task HandleIncomingSubscribePacketAsync(IMqttChannelAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken)
  191. {
  192. var subscribeResult = await _subscriptionsManager.SubscribeAsync(subscribePacket, ClientId);
  193. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket).ConfigureAwait(false);
  194. if (subscribeResult.CloseConnection)
  195. {
  196. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttDisconnectPacket()).ConfigureAwait(false);
  197. await StopAsync().ConfigureAwait(false);
  198. }
  199. await EnqueueSubscribedRetainedMessagesAsync(subscribePacket).ConfigureAwait(false);
  200. }
  201. private async Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken)
  202. {
  203. var unsubscribeResult = await _subscriptionsManager.UnsubscribeAsync(unsubscribePacket);
  204. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, unsubscribeResult);
  205. }
  206. private async Task EnqueueSubscribedRetainedMessagesAsync(MqttSubscribePacket subscribePacket)
  207. {
  208. var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(subscribePacket);
  209. foreach (var publishPacket in retainedMessages)
  210. {
  211. await EnqueueApplicationMessageAsync(publishPacket);
  212. }
  213. }
  214. private Task HandleIncomingPublishPacketAsync(IMqttChannelAdapter adapter, MqttPublishPacket publishPacket, CancellationToken cancellationToken)
  215. {
  216. var applicationMessage = publishPacket.ToApplicationMessage();
  217. switch (applicationMessage.QualityOfServiceLevel)
  218. {
  219. case MqttQualityOfServiceLevel.AtMostOnce:
  220. {
  221. return _sessionsManager.DispatchApplicationMessageAsync(this, applicationMessage);
  222. }
  223. case MqttQualityOfServiceLevel.AtLeastOnce:
  224. {
  225. return HandleIncomingPublishPacketWithQoS1(adapter, applicationMessage, publishPacket, cancellationToken);
  226. }
  227. case MqttQualityOfServiceLevel.ExactlyOnce:
  228. {
  229. return HandleIncomingPublishPacketWithQoS2(adapter, applicationMessage, publishPacket, cancellationToken);
  230. }
  231. default:
  232. {
  233. throw new MqttCommunicationException("Received a not supported QoS level.");
  234. }
  235. }
  236. }
  237. private async Task HandleIncomingPublishPacketWithQoS1(IMqttChannelAdapter adapter, MqttApplicationMessage applicationMessage, MqttPublishPacket publishPacket, CancellationToken cancellationToken)
  238. {
  239. await _sessionsManager.DispatchApplicationMessageAsync(this, applicationMessage).ConfigureAwait(false);
  240. var response = new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier };
  241. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response).ConfigureAwait(false);
  242. }
  243. private async Task HandleIncomingPublishPacketWithQoS2(IMqttChannelAdapter adapter, MqttApplicationMessage applicationMessage, MqttPublishPacket publishPacket, CancellationToken cancellationToken)
  244. {
  245. // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
  246. await _sessionsManager.DispatchApplicationMessageAsync(this, applicationMessage).ConfigureAwait(false);
  247. var response = new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier };
  248. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response).ConfigureAwait(false);
  249. }
  250. private Task HandleIncomingPubRelPacketAsync(IMqttChannelAdapter adapter, MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken)
  251. {
  252. var response = new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier };
  253. return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response);
  254. }
  255. private void StartCheckingKeepAliveTimeout(TimeSpan keepAlivePeriod, CancellationToken cancellationToken)
  256. {
  257. Task.Run(
  258. async () => await CheckKeepAliveTimeoutAsync(keepAlivePeriod, cancellationToken).ConfigureAwait(false)
  259. , cancellationToken);
  260. }
  261. private async Task CheckKeepAliveTimeoutAsync(TimeSpan keepAlivePeriod, CancellationToken cancellationToken)
  262. {
  263. try
  264. {
  265. while (!cancellationToken.IsCancellationRequested)
  266. {
  267. // Values described here: [MQTT-3.1.2-24].
  268. if (_lastPacketReceivedTracker.Elapsed.TotalSeconds > keepAlivePeriod.TotalSeconds * 1.5D)
  269. {
  270. _logger.Warning<MqttClientSession>("Client '{0}': Did not receive any packet or keep alive signal.", ClientId);
  271. await StopAsync();
  272. return;
  273. }
  274. await Task.Delay(keepAlivePeriod, cancellationToken);
  275. }
  276. }
  277. catch (OperationCanceledException)
  278. {
  279. }
  280. catch (Exception exception)
  281. {
  282. _logger.Error<MqttClientSession>(exception, "Client '{0}': Unhandled exception while checking keep alive timeouts.", ClientId);
  283. }
  284. finally
  285. {
  286. _logger.Trace<MqttClientSession>("Client {0}: Stopped checking keep alive timeout.", ClientId);
  287. }
  288. }
  289. }
  290. }