Não pode escolher mais do que 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 
 
 

244 linhas
10 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Core.Adapter;
  6. using MQTTnet.Core.Exceptions;
  7. using MQTTnet.Core.Internal;
  8. using MQTTnet.Core.Packets;
  9. using MQTTnet.Core.Protocol;
  10. using MQTTnet.Core.Serializer;
  11. using Microsoft.Extensions.Logging;
  12. namespace MQTTnet.Core.Server
  13. {
  14. public sealed class MqttClientSession : IDisposable
  15. {
  16. private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>();
  17. private readonly MqttClientSubscriptionsManager _subscriptionsManager = new MqttClientSubscriptionsManager();
  18. private readonly MqttClientSessionsManager _mqttClientSessionsManager;
  19. private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;
  20. private readonly MqttServerOptions _options;
  21. private readonly ILogger<MqttClientSession> _logger;
  22. private IMqttCommunicationAdapter _adapter;
  23. private CancellationTokenSource _cancellationTokenSource;
  24. private MqttApplicationMessage _willMessage;
  25. public MqttClientSession(string clientId, MqttClientSessionsManager mqttClientSessionsManager, ILogger<MqttClientSession> logger, ILogger<MqttClientPendingMessagesQueue> msgQueueLogger)
  26. {
  27. _mqttClientSessionsManager = mqttClientSessionsManager ?? throw new ArgumentNullException(nameof(mqttClientSessionsManager));
  28. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  29. ClientId = clientId;
  30. _options = mqttClientSessionsManager.Options;
  31. _pendingMessagesQueue = new MqttClientPendingMessagesQueue(mqttClientSessionsManager.Options, this, msgQueueLogger);
  32. }
  33. public string ClientId { get; }
  34. public MqttProtocolVersion? ProtocolVersion => _adapter?.PacketSerializer.ProtocolVersion;
  35. public bool IsConnected => _adapter != null;
  36. public async Task RunAsync(MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter)
  37. {
  38. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  39. try
  40. {
  41. _willMessage = willMessage;
  42. _adapter = adapter;
  43. _cancellationTokenSource = new CancellationTokenSource();
  44. _pendingMessagesQueue.Start(adapter, _cancellationTokenSource.Token);
  45. await ReceivePacketsAsync(adapter, _cancellationTokenSource.Token);
  46. }
  47. catch (OperationCanceledException)
  48. {
  49. }
  50. catch (MqttCommunicationException exception)
  51. {
  52. _logger.LogWarning(new EventId(), exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
  53. }
  54. catch (Exception exception)
  55. {
  56. _logger.LogError(new EventId(), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
  57. }
  58. }
  59. public void Stop()
  60. {
  61. try
  62. {
  63. _cancellationTokenSource?.Cancel(false);
  64. _cancellationTokenSource?.Dispose();
  65. _cancellationTokenSource = null;
  66. _adapter = null;
  67. _logger.LogInformation("Client '{0}': Disconnected.", ClientId);
  68. }
  69. finally
  70. {
  71. if (_willMessage != null)
  72. {
  73. _mqttClientSessionsManager.DispatchApplicationMessage(this, _willMessage);
  74. }
  75. }
  76. }
  77. public void EnqueuePublishPacket(MqttPublishPacket publishPacket)
  78. {
  79. if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
  80. if (!_subscriptionsManager.IsSubscribed(publishPacket))
  81. {
  82. return;
  83. }
  84. _pendingMessagesQueue.Enqueue(publishPacket);
  85. _logger.LogTrace("Client '{0}': Enqueued pending publish packet.", ClientId);
  86. }
  87. public void Dispose()
  88. {
  89. _cancellationTokenSource?.Cancel();
  90. _cancellationTokenSource?.Dispose();
  91. }
  92. private async Task ReceivePacketsAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
  93. {
  94. try
  95. {
  96. while (!cancellationToken.IsCancellationRequested)
  97. {
  98. var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
  99. await ProcessReceivedPacketAsync(adapter, packet).ConfigureAwait(false);
  100. }
  101. }
  102. catch (OperationCanceledException)
  103. {
  104. }
  105. catch (MqttCommunicationException exception)
  106. {
  107. _logger.LogWarning(new EventId(), exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
  108. Stop();
  109. }
  110. catch (Exception exception)
  111. {
  112. _logger.LogError(new EventId(), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
  113. Stop();
  114. }
  115. }
  116. private async Task ProcessReceivedPacketAsync(IMqttCommunicationAdapter adapter, MqttBasePacket packet)
  117. {
  118. if (packet is MqttSubscribePacket subscribePacket)
  119. {
  120. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Subscribe(subscribePacket));
  121. EnqueueRetainedMessages(subscribePacket);
  122. }
  123. else if (packet is MqttUnsubscribePacket unsubscribePacket)
  124. {
  125. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, _subscriptionsManager.Unsubscribe(unsubscribePacket));
  126. }
  127. else if (packet is MqttPublishPacket publishPacket)
  128. {
  129. await HandleIncomingPublishPacketAsync(adapter, publishPacket);
  130. }
  131. else if (packet is MqttPubRelPacket pubRelPacket)
  132. {
  133. await HandleIncomingPubRelPacketAsync(adapter, pubRelPacket);
  134. }
  135. else if (packet is MqttPubRecPacket pubRecPacket)
  136. {
  137. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, pubRecPacket.CreateResponse<MqttPubRelPacket>());
  138. }
  139. else if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
  140. {
  141. // Discard message.
  142. }
  143. else if (packet is MqttPingReqPacket)
  144. {
  145. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPingRespPacket());
  146. }
  147. else if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
  148. {
  149. Stop();
  150. }
  151. else
  152. {
  153. _logger.LogWarning("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
  154. Stop();
  155. }
  156. }
  157. private void EnqueueRetainedMessages(MqttSubscribePacket subscribePacket)
  158. {
  159. var retainedMessages = _mqttClientSessionsManager.RetainedMessagesManager.GetMessages(subscribePacket);
  160. foreach (var publishPacket in retainedMessages)
  161. {
  162. EnqueuePublishPacket(publishPacket.ToPublishPacket());
  163. }
  164. }
  165. private async Task HandleIncomingPublishPacketAsync(IMqttCommunicationAdapter adapter, MqttPublishPacket publishPacket)
  166. {
  167. var applicationMessage = publishPacket.ToApplicationMessage();
  168. _options.ApplicationMessageInterceptor?.Invoke(applicationMessage);
  169. if (applicationMessage.Retain)
  170. {
  171. await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, applicationMessage);
  172. }
  173. switch (applicationMessage.QualityOfServiceLevel)
  174. {
  175. case MqttQualityOfServiceLevel.AtMostOnce:
  176. {
  177. _mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage);
  178. return;
  179. }
  180. case MqttQualityOfServiceLevel.AtLeastOnce:
  181. {
  182. _mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage);
  183. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token,
  184. new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
  185. return;
  186. }
  187. case MqttQualityOfServiceLevel.ExactlyOnce:
  188. {
  189. // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
  190. lock (_unacknowledgedPublishPackets)
  191. {
  192. _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier);
  193. }
  194. _mqttClientSessionsManager.DispatchApplicationMessage(this, applicationMessage);
  195. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token,
  196. new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
  197. return;
  198. }
  199. default:
  200. throw new MqttCommunicationException("Received a not supported QoS level.");
  201. }
  202. }
  203. private Task HandleIncomingPubRelPacketAsync(IMqttCommunicationAdapter adapter, MqttPubRelPacket pubRelPacket)
  204. {
  205. lock (_unacknowledgedPublishPackets)
  206. {
  207. _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
  208. }
  209. return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier });
  210. }
  211. }
  212. }