Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 
 

228 rindas
9.5 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Core.Adapter;
  6. using MQTTnet.Core.Diagnostics;
  7. using MQTTnet.Core.Exceptions;
  8. using MQTTnet.Core.Internal;
  9. using MQTTnet.Core.Packets;
  10. using MQTTnet.Core.Protocol;
  11. using MQTTnet.Core.Serializer;
  12. namespace MQTTnet.Core.Server
  13. {
  14. public sealed class MqttClientSession : IDisposable
  15. {
  16. private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>();
  17. private readonly MqttClientSubscriptionsManager _subscriptionsManager = new MqttClientSubscriptionsManager();
  18. private readonly MqttClientSessionsManager _mqttClientSessionsManager;
  19. private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;
  20. private readonly MqttServerOptions _options;
  21. private IMqttCommunicationAdapter _adapter;
  22. private CancellationTokenSource _cancellationTokenSource;
  23. private MqttApplicationMessage _willMessage;
  24. public MqttClientSession(string clientId, MqttServerOptions options, MqttClientSessionsManager mqttClientSessionsManager)
  25. {
  26. ClientId = clientId;
  27. _options = options ?? throw new ArgumentNullException(nameof(options));
  28. _mqttClientSessionsManager = mqttClientSessionsManager ?? throw new ArgumentNullException(nameof(mqttClientSessionsManager));
  29. _pendingMessagesQueue = new MqttClientPendingMessagesQueue(options, this);
  30. }
  31. public string ClientId { get; }
  32. public MqttProtocolVersion? ProtocolVersion => _adapter?.PacketSerializer.ProtocolVersion;
  33. public bool IsConnected => _adapter != null;
  34. public async Task RunAsync(MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter)
  35. {
  36. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  37. _willMessage = willMessage;
  38. try
  39. {
  40. _adapter = adapter;
  41. _cancellationTokenSource = new CancellationTokenSource();
  42. _pendingMessagesQueue.Start(adapter, _cancellationTokenSource.Token);
  43. await ReceivePacketsAsync(adapter, _cancellationTokenSource.Token);
  44. }
  45. catch (OperationCanceledException)
  46. {
  47. }
  48. catch (MqttCommunicationException exception)
  49. {
  50. MqttNetTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
  51. }
  52. catch (Exception exception)
  53. {
  54. MqttNetTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
  55. }
  56. }
  57. public void Stop()
  58. {
  59. if (_willMessage != null)
  60. {
  61. _mqttClientSessionsManager.DispatchPublishPacket(this, _willMessage.ToPublishPacket());
  62. }
  63. _cancellationTokenSource?.Cancel(false);
  64. _cancellationTokenSource?.Dispose();
  65. _cancellationTokenSource = null;
  66. _adapter = null;
  67. MqttNetTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId);
  68. }
  69. public void EnqueuePublishPacket(MqttPublishPacket publishPacket)
  70. {
  71. if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
  72. if (!_subscriptionsManager.IsSubscribed(publishPacket))
  73. {
  74. return;
  75. }
  76. _pendingMessagesQueue.Enqueue(publishPacket);
  77. MqttNetTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", ClientId);
  78. }
  79. public void Dispose()
  80. {
  81. _cancellationTokenSource?.Cancel();
  82. _cancellationTokenSource?.Dispose();
  83. }
  84. private async Task ReceivePacketsAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
  85. {
  86. try
  87. {
  88. while (!cancellationToken.IsCancellationRequested)
  89. {
  90. var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
  91. await ProcessReceivedPacketAsync(adapter, packet).ConfigureAwait(false);
  92. }
  93. }
  94. catch (OperationCanceledException)
  95. {
  96. }
  97. catch (MqttCommunicationException exception)
  98. {
  99. MqttNetTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
  100. Stop();
  101. }
  102. catch (Exception exception)
  103. {
  104. MqttNetTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
  105. Stop();
  106. }
  107. }
  108. private async Task ProcessReceivedPacketAsync(IMqttCommunicationAdapter adapter, MqttBasePacket packet)
  109. {
  110. if (packet is MqttSubscribePacket subscribePacket)
  111. {
  112. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Subscribe(subscribePacket));
  113. EnqueueRetainedMessages(subscribePacket);
  114. }
  115. else if (packet is MqttUnsubscribePacket unsubscribePacket)
  116. {
  117. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Unsubscribe(unsubscribePacket));
  118. }
  119. else if (packet is MqttPublishPacket publishPacket)
  120. {
  121. await HandleIncomingPublishPacketAsync(adapter, publishPacket);
  122. }
  123. else if (packet is MqttPubRelPacket pubRelPacket)
  124. {
  125. await HandleIncomingPubRelPacketAsync(adapter, pubRelPacket);
  126. }
  127. else if (packet is MqttPubRecPacket pubRecPacket)
  128. {
  129. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, pubRecPacket.CreateResponse<MqttPubRelPacket>());
  130. }
  131. else if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
  132. {
  133. // Discard message.
  134. }
  135. else if (packet is MqttPingReqPacket)
  136. {
  137. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPingRespPacket());
  138. }
  139. else if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
  140. {
  141. Stop();
  142. }
  143. else
  144. {
  145. MqttNetTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
  146. Stop();
  147. }
  148. }
  149. private void EnqueueRetainedMessages(MqttSubscribePacket subscribePacket)
  150. {
  151. var retainedMessages = _mqttClientSessionsManager.RetainedMessagesManager.GetMessages(subscribePacket);
  152. foreach (var publishPacket in retainedMessages)
  153. {
  154. EnqueuePublishPacket(publishPacket);
  155. }
  156. }
  157. private async Task HandleIncomingPublishPacketAsync(IMqttCommunicationAdapter adapter, MqttPublishPacket publishPacket)
  158. {
  159. if (publishPacket.Retain)
  160. {
  161. await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, publishPacket);
  162. }
  163. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  164. {
  165. _mqttClientSessionsManager.DispatchPublishPacket(this, publishPacket);
  166. return;
  167. }
  168. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
  169. {
  170. _mqttClientSessionsManager.DispatchPublishPacket(this, publishPacket);
  171. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
  172. return;
  173. }
  174. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
  175. {
  176. // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
  177. lock (_unacknowledgedPublishPackets)
  178. {
  179. _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier);
  180. }
  181. _mqttClientSessionsManager.DispatchPublishPacket(this, publishPacket);
  182. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
  183. return;
  184. }
  185. throw new MqttCommunicationException("Received a not supported QoS level.");
  186. }
  187. private Task HandleIncomingPubRelPacketAsync(IMqttCommunicationAdapter adapter, MqttPubRelPacket pubRelPacket)
  188. {
  189. lock (_unacknowledgedPublishPackets)
  190. {
  191. _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
  192. }
  193. return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier });
  194. }
  195. }
  196. }