You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

193 rivejä
7.5 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Core.Adapter;
  6. using MQTTnet.Core.Diagnostics;
  7. using MQTTnet.Core.Exceptions;
  8. using MQTTnet.Core.Internal;
  9. using MQTTnet.Core.Packets;
  10. using MQTTnet.Core.Protocol;
  11. namespace MQTTnet.Core.Server
  12. {
  13. public sealed class MqttClientSession : IDisposable
  14. {
  15. private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>();
  16. private readonly MqttClientSubscriptionsManager _subscriptionsManager = new MqttClientSubscriptionsManager();
  17. private readonly MqttClientMessageQueue _messageQueue;
  18. private readonly Action<MqttClientSession, MqttPublishPacket> _publishPacketReceivedCallback;
  19. private readonly MqttServerOptions _options;
  20. private CancellationTokenSource _cancellationTokenSource;
  21. private string _identifier;
  22. private MqttApplicationMessage _willApplicationMessage;
  23. public MqttClientSession(string clientId, MqttServerOptions options, Action<MqttClientSession, MqttPublishPacket> publishPacketReceivedCallback)
  24. {
  25. ClientId = clientId;
  26. _options = options ?? throw new ArgumentNullException(nameof(options));
  27. _publishPacketReceivedCallback = publishPacketReceivedCallback ?? throw new ArgumentNullException(nameof(publishPacketReceivedCallback));
  28. _messageQueue = new MqttClientMessageQueue(options);
  29. }
  30. public string ClientId { get; }
  31. public bool IsConnected => Adapter != null;
  32. public IMqttCommunicationAdapter Adapter { get; private set; }
  33. public async Task RunAsync(string identifier, MqttApplicationMessage willApplicationMessage, IMqttCommunicationAdapter adapter)
  34. {
  35. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  36. _willApplicationMessage = willApplicationMessage;
  37. try
  38. {
  39. _identifier = identifier;
  40. Adapter = adapter;
  41. _cancellationTokenSource = new CancellationTokenSource();
  42. _messageQueue.Start(adapter);
  43. while (!_cancellationTokenSource.IsCancellationRequested)
  44. {
  45. var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero).ConfigureAwait(false);
  46. await HandleIncomingPacketAsync(packet).ConfigureAwait(false);
  47. }
  48. }
  49. catch (MqttCommunicationException)
  50. {
  51. }
  52. catch (Exception exception)
  53. {
  54. MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", _identifier);
  55. }
  56. finally
  57. {
  58. if (willApplicationMessage != null)
  59. {
  60. _publishPacketReceivedCallback(this, _willApplicationMessage.ToPublishPacket());
  61. }
  62. _messageQueue.Stop();
  63. _cancellationTokenSource.Cancel();
  64. Adapter = null;
  65. MqttTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", _identifier);
  66. }
  67. }
  68. public void EnqueuePublishPacket(MqttPublishPacket publishPacket)
  69. {
  70. if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
  71. if (!_subscriptionsManager.IsSubscribed(publishPacket))
  72. {
  73. return;
  74. }
  75. _messageQueue.Enqueue(publishPacket);
  76. MqttTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", _identifier);
  77. }
  78. public void Dispose()
  79. {
  80. _cancellationTokenSource?.Cancel();
  81. _cancellationTokenSource?.Dispose();
  82. }
  83. private Task HandleIncomingPacketAsync(MqttBasePacket packet)
  84. {
  85. if (packet is MqttSubscribePacket subscribePacket)
  86. {
  87. return Adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout);
  88. }
  89. if (packet is MqttUnsubscribePacket unsubscribePacket)
  90. {
  91. return Adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout);
  92. }
  93. if (packet is MqttPublishPacket publishPacket)
  94. {
  95. return HandleIncomingPublishPacketAsync(publishPacket);
  96. }
  97. if (packet is MqttPubRelPacket pubRelPacket)
  98. {
  99. return HandleIncomingPubRelPacketAsync(pubRelPacket);
  100. }
  101. if (packet is MqttPubRecPacket pubRecPacket)
  102. {
  103. return Adapter.SendPacketAsync(pubRecPacket.CreateResponse<MqttPubRelPacket>(), _options.DefaultCommunicationTimeout);
  104. }
  105. if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
  106. {
  107. // Discard message.
  108. return Task.FromResult((object)null);
  109. }
  110. if (packet is MqttPingReqPacket)
  111. {
  112. return Adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout);
  113. }
  114. if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
  115. {
  116. _cancellationTokenSource.Cancel();
  117. return Task.FromResult((object)null);
  118. }
  119. MqttTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", _identifier, packet);
  120. _cancellationTokenSource.Cancel();
  121. return Task.FromResult((object)null);
  122. }
  123. private Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
  124. {
  125. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  126. {
  127. _publishPacketReceivedCallback(this, publishPacket);
  128. return Task.FromResult(0);
  129. }
  130. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
  131. {
  132. _publishPacketReceivedCallback(this, publishPacket);
  133. return Adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
  134. }
  135. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
  136. {
  137. // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
  138. lock (_unacknowledgedPublishPackets)
  139. {
  140. _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier);
  141. }
  142. _publishPacketReceivedCallback(this, publishPacket);
  143. return Adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
  144. }
  145. throw new MqttCommunicationException("Received a not supported QoS level.");
  146. }
  147. private Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket)
  148. {
  149. lock (_unacknowledgedPublishPackets)
  150. {
  151. _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
  152. }
  153. return Adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
  154. }
  155. }
  156. }