You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

184 line
7.4 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Core.Adapter;
  6. using MQTTnet.Core.Diagnostics;
  7. using MQTTnet.Core.Exceptions;
  8. using MQTTnet.Core.Internal;
  9. using MQTTnet.Core.Packets;
  10. using MQTTnet.Core.Protocol;
  11. namespace MQTTnet.Core.Server
  12. {
  13. public sealed class MqttClientSession : IDisposable
  14. {
  15. private readonly ConcurrentDictionary<ushort, MqttPublishPacket> _pendingIncomingPublications = new ConcurrentDictionary<ushort, MqttPublishPacket>();
  16. private readonly MqttClientSubscriptionsManager _subscriptionsManager = new MqttClientSubscriptionsManager();
  17. private readonly MqttClientMessageQueue _messageQueue;
  18. private readonly Action<MqttClientSession, MqttPublishPacket> _publishPacketReceivedCallback;
  19. private readonly MqttServerOptions _options;
  20. private CancellationTokenSource _cancellationTokenSource;
  21. private IMqttCommunicationAdapter _adapter;
  22. private string _identifier;
  23. private MqttApplicationMessage _willApplicationMessage;
  24. public MqttClientSession(MqttServerOptions options, Action<MqttClientSession, MqttPublishPacket> publishPacketReceivedCallback)
  25. {
  26. _options = options ?? throw new ArgumentNullException(nameof(options));
  27. _publishPacketReceivedCallback = publishPacketReceivedCallback ?? throw new ArgumentNullException(nameof(publishPacketReceivedCallback));
  28. _messageQueue = new MqttClientMessageQueue(options);
  29. }
  30. public bool IsConnected => _adapter != null;
  31. public async Task RunAsync(string identifier, MqttApplicationMessage willApplicationMessage, IMqttCommunicationAdapter adapter)
  32. {
  33. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  34. _willApplicationMessage = willApplicationMessage;
  35. try
  36. {
  37. _identifier = identifier;
  38. _adapter = adapter;
  39. _cancellationTokenSource = new CancellationTokenSource();
  40. _messageQueue.Start(adapter);
  41. while (!_cancellationTokenSource.IsCancellationRequested)
  42. {
  43. var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero);
  44. await HandleIncomingPacketAsync(packet);
  45. }
  46. }
  47. catch (MqttCommunicationException)
  48. {
  49. }
  50. catch (Exception exception)
  51. {
  52. MqttTrace.Error(nameof(MqttClientSession), exception, $"Client '{_identifier}': Unhandled exception while processing client packets.");
  53. }
  54. finally
  55. {
  56. if (willApplicationMessage != null)
  57. {
  58. _publishPacketReceivedCallback(this, _willApplicationMessage.ToPublishPacket());
  59. }
  60. _messageQueue.Stop();
  61. _cancellationTokenSource.Cancel();
  62. _adapter = null;
  63. MqttTrace.Information(nameof(MqttClientSession), $"Client '{_identifier}': Disconnected.");
  64. }
  65. }
  66. public void EnqueuePublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
  67. {
  68. if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession));
  69. if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
  70. if (!_subscriptionsManager.IsTopicSubscribed(publishPacket))
  71. {
  72. return;
  73. }
  74. _messageQueue.Enqueue(senderClientSession, publishPacket);
  75. MqttTrace.Verbose(nameof(MqttClientSession), $"Client '{_identifier}: Enqueued pending publish packet.");
  76. }
  77. public void Dispose()
  78. {
  79. _cancellationTokenSource?.Cancel();
  80. _cancellationTokenSource?.Dispose();
  81. }
  82. private Task HandleIncomingPacketAsync(MqttBasePacket packet)
  83. {
  84. var subscribePacket = packet as MqttSubscribePacket;
  85. if (subscribePacket != null)
  86. {
  87. return _adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout);
  88. }
  89. var unsubscribePacket = packet as MqttUnsubscribePacket;
  90. if (unsubscribePacket != null)
  91. {
  92. return _adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout);
  93. }
  94. var publishPacket = packet as MqttPublishPacket;
  95. if (publishPacket != null)
  96. {
  97. return HandleIncomingPublishPacketAsync(publishPacket);
  98. }
  99. var pubRelPacket = packet as MqttPubRelPacket;
  100. if (pubRelPacket != null)
  101. {
  102. return HandleIncomingPubRelPacketAsync(pubRelPacket);
  103. }
  104. var pubAckPacket = packet as MqttPubAckPacket;
  105. if (pubAckPacket != null)
  106. {
  107. return HandleIncomingPubAckPacketAsync(pubAckPacket);
  108. }
  109. if (packet is MqttPingReqPacket)
  110. {
  111. return _adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout);
  112. }
  113. if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
  114. {
  115. _cancellationTokenSource.Cancel();
  116. return Task.FromResult((object)null);
  117. }
  118. MqttTrace.Warning(nameof(MqttClientSession), $"Client '{_identifier}': Received not supported packet ({packet}). Closing connection.");
  119. _cancellationTokenSource.Cancel();
  120. return Task.FromResult((object)null);
  121. }
  122. private async Task HandleIncomingPubAckPacketAsync(MqttPubAckPacket pubAckPacket)
  123. {
  124. await Task.FromResult((object)null);
  125. }
  126. private async Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
  127. {
  128. if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
  129. {
  130. _publishPacketReceivedCallback(this, publishPacket);
  131. }
  132. else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
  133. {
  134. await _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
  135. _publishPacketReceivedCallback(this, publishPacket);
  136. }
  137. else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
  138. {
  139. _pendingIncomingPublications[publishPacket.PacketIdentifier] = publishPacket;
  140. await _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
  141. }
  142. }
  143. private async Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket)
  144. {
  145. MqttPublishPacket publishPacket;
  146. if (!_pendingIncomingPublications.TryRemove(pubRelPacket.PacketIdentifier, out publishPacket))
  147. {
  148. return;
  149. }
  150. await _adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
  151. _publishPacketReceivedCallback(this, publishPacket);
  152. }
  153. }
  154. }