You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

239 rivejä
10 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Core.Adapter;
  6. using MQTTnet.Core.Diagnostics;
  7. using MQTTnet.Core.Exceptions;
  8. using MQTTnet.Core.Internal;
  9. using MQTTnet.Core.Packets;
  10. using MQTTnet.Core.Protocol;
  11. using MQTTnet.Core.Serializer;
  12. namespace MQTTnet.Core.Server
  13. {
  14. public sealed class MqttClientSession : IDisposable
  15. {
  16. private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>();
  17. private readonly MqttClientSubscriptionsManager _subscriptionsManager = new MqttClientSubscriptionsManager();
  18. private readonly MqttClientSessionsManager _mqttClientSessionsManager;
  19. private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;
  20. private readonly MqttServerOptions _options;
  21. private readonly MqttNetTrace _trace;
  22. private IMqttCommunicationAdapter _adapter;
  23. private CancellationTokenSource _cancellationTokenSource;
  24. private MqttApplicationMessage _willMessage;
  25. public MqttClientSession(string clientId, MqttServerOptions options, MqttClientSessionsManager mqttClientSessionsManager, MqttNetTrace trace)
  26. {
  27. _trace = trace ?? throw new ArgumentNullException(nameof(trace));
  28. ClientId = clientId;
  29. _options = options ?? throw new ArgumentNullException(nameof(options));
  30. _mqttClientSessionsManager = mqttClientSessionsManager ?? throw new ArgumentNullException(nameof(mqttClientSessionsManager));
  31. _pendingMessagesQueue = new MqttClientPendingMessagesQueue(options, this, trace);
  32. }
  33. public string ClientId { get; }
  34. public MqttProtocolVersion? ProtocolVersion => _adapter?.PacketSerializer.ProtocolVersion;
  35. public bool IsConnected => _adapter != null;
  36. public async Task RunAsync(MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter)
  37. {
  38. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  39. _willMessage = willMessage;
  40. try
  41. {
  42. _adapter = adapter;
  43. _cancellationTokenSource = new CancellationTokenSource();
  44. _pendingMessagesQueue.Start(adapter, _cancellationTokenSource.Token);
  45. await ReceivePacketsAsync(adapter, _cancellationTokenSource.Token);
  46. }
  47. catch (OperationCanceledException)
  48. {
  49. }
  50. catch (MqttCommunicationException exception)
  51. {
  52. _trace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
  53. }
  54. catch (Exception exception)
  55. {
  56. _trace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
  57. }
  58. }
  59. public void Stop()
  60. {
  61. if (_willMessage != null)
  62. {
  63. _mqttClientSessionsManager.DispatchApplicationMessage(this, _willMessage);
  64. }
  65. _cancellationTokenSource?.Cancel(false);
  66. _cancellationTokenSource?.Dispose();
  67. _cancellationTokenSource = null;
  68. _adapter = null;
  69. _trace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId);
  70. }
  71. public void EnqueuePublishPacket(MqttPublishPacket publishPacket)
  72. {
  73. if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
  74. if (!_subscriptionsManager.IsSubscribed(publishPacket))
  75. {
  76. return;
  77. }
  78. _pendingMessagesQueue.Enqueue(publishPacket);
  79. _trace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", ClientId);
  80. }
  81. public void Dispose()
  82. {
  83. _cancellationTokenSource?.Cancel();
  84. _cancellationTokenSource?.Dispose();
  85. }
  86. private async Task ReceivePacketsAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
  87. {
  88. try
  89. {
  90. while (!cancellationToken.IsCancellationRequested)
  91. {
  92. var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
  93. await ProcessReceivedPacketAsync(adapter, packet).ConfigureAwait(false);
  94. }
  95. }
  96. catch (OperationCanceledException)
  97. {
  98. }
  99. catch (MqttCommunicationException exception)
  100. {
  101. _trace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
  102. Stop();
  103. }
  104. catch (Exception exception)
  105. {
  106. _trace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
  107. Stop();
  108. }
  109. }
  110. private async Task ProcessReceivedPacketAsync(IMqttCommunicationAdapter adapter, MqttBasePacket packet)
  111. {
  112. if (packet is MqttSubscribePacket subscribePacket)
  113. {
  114. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Subscribe(subscribePacket));
  115. EnqueueRetainedMessages(subscribePacket);
  116. }
  117. else if (packet is MqttUnsubscribePacket unsubscribePacket)
  118. {
  119. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Unsubscribe(unsubscribePacket));
  120. }
  121. else if (packet is MqttPublishPacket publishPacket)
  122. {
  123. await HandleIncomingPublishPacketAsync(adapter, publishPacket);
  124. }
  125. else if (packet is MqttPubRelPacket pubRelPacket)
  126. {
  127. await HandleIncomingPubRelPacketAsync(adapter, pubRelPacket);
  128. }
  129. else if (packet is MqttPubRecPacket pubRecPacket)
  130. {
  131. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, pubRecPacket.CreateResponse<MqttPubRelPacket>());
  132. }
  133. else if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
  134. {
  135. // Discard message.
  136. }
  137. else if (packet is MqttPingReqPacket)
  138. {
  139. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPingRespPacket());
  140. }
  141. else if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
  142. {
  143. Stop();
  144. }
  145. else
  146. {
  147. _trace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
  148. Stop();
  149. }
  150. }
  151. private void EnqueueRetainedMessages(MqttSubscribePacket subscribePacket)
  152. {
  153. var retainedMessages = _mqttClientSessionsManager.RetainedMessagesManager.GetMessages(subscribePacket);
  154. foreach (var publishPacket in retainedMessages)
  155. {
  156. EnqueuePublishPacket(publishPacket.ToPublishPacket());
  157. }
  158. }
  159. private async Task HandleIncomingPublishPacketAsync(IMqttCommunicationAdapter adapter, MqttPublishPacket publishPacket)
  160. {
  161. var applicationMessage = publishPacket.ToApplicationMessage();
  162. _options.ApplicationMessageInterceptor?.Invoke(applicationMessage);
  163. if (applicationMessage.Retain)
  164. {
  165. await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, applicationMessage);
  166. }
  167. switch (applicationMessage.QualityOfServiceLevel)
  168. {
  169. case MqttQualityOfServiceLevel.AtMostOnce:
  170. {
  171. _mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage);
  172. return;
  173. }
  174. case MqttQualityOfServiceLevel.AtLeastOnce:
  175. {
  176. _mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage);
  177. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token,
  178. new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
  179. return;
  180. }
  181. case MqttQualityOfServiceLevel.ExactlyOnce:
  182. {
  183. // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
  184. lock (_unacknowledgedPublishPackets)
  185. {
  186. _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier);
  187. }
  188. _mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage);
  189. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token,
  190. new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
  191. return;
  192. }
  193. default:
  194. throw new MqttCommunicationException("Received a not supported QoS level.");
  195. }
  196. }
  197. private Task HandleIncomingPubRelPacketAsync(IMqttCommunicationAdapter adapter, MqttPubRelPacket pubRelPacket)
  198. {
  199. lock (_unacknowledgedPublishPackets)
  200. {
  201. _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
  202. }
  203. return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier });
  204. }
  205. }
  206. }