You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

161 lines
5.6 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Adapter;
  6. using MQTTnet.Diagnostics;
  7. using MQTTnet.Exceptions;
  8. using MQTTnet.Internal;
  9. using MQTTnet.Packets;
  10. using MQTTnet.Protocol;
  11. namespace MQTTnet.Server
  12. {
  13. public sealed class MqttClientPendingMessagesQueue : IDisposable
  14. {
  15. private readonly ConcurrentQueue<MqttBasePacket> _queue = new ConcurrentQueue<MqttBasePacket>();
  16. private readonly AsyncAutoResetEvent _queueAutoResetEvent = new AsyncAutoResetEvent();
  17. private readonly IMqttServerOptions _options;
  18. private readonly MqttClientSession _clientSession;
  19. private readonly IMqttNetChildLogger _logger;
  20. private Task _workerTask;
  21. public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession clientSession, IMqttNetChildLogger logger)
  22. {
  23. if (logger == null) throw new ArgumentNullException(nameof(logger));
  24. _options = options ?? throw new ArgumentNullException(nameof(options));
  25. _clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession));
  26. _logger = logger.CreateChildLogger(nameof(MqttClientPendingMessagesQueue));
  27. }
  28. public int Count => _queue.Count;
  29. public void Start(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
  30. {
  31. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  32. if (cancellationToken.IsCancellationRequested)
  33. {
  34. return;
  35. }
  36. _workerTask = Task.Run(() => SendQueuedPacketsAsync(adapter, cancellationToken), cancellationToken);
  37. }
  38. public void WaitForCompletion()
  39. {
  40. if (_workerTask != null)
  41. {
  42. Task.WaitAll(_workerTask);
  43. }
  44. }
  45. public void Enqueue(MqttBasePacket packet)
  46. {
  47. if (packet == null) throw new ArgumentNullException(nameof(packet));
  48. if (_queue.Count >= _options.MaxPendingMessagesPerClient)
  49. {
  50. if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
  51. {
  52. return;
  53. }
  54. if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
  55. {
  56. _queue.TryDequeue(out _);
  57. }
  58. }
  59. _queue.Enqueue(packet);
  60. _queueAutoResetEvent.Set();
  61. _logger.Verbose("Enqueued packet (ClientId: {0}).", _clientSession.ClientId);
  62. }
  63. private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
  64. {
  65. try
  66. {
  67. while (!cancellationToken.IsCancellationRequested)
  68. {
  69. await TrySendNextQueuedPacketAsync(adapter, cancellationToken).ConfigureAwait(false);
  70. }
  71. }
  72. catch (OperationCanceledException)
  73. {
  74. }
  75. catch (Exception exception)
  76. {
  77. _logger.Error(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _clientSession.ClientId);
  78. }
  79. }
  80. private async Task TrySendNextQueuedPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
  81. {
  82. MqttBasePacket packet = null;
  83. try
  84. {
  85. if (_queue.IsEmpty)
  86. {
  87. await _queueAutoResetEvent.WaitOneAsync(cancellationToken).ConfigureAwait(false);
  88. }
  89. if (!_queue.TryDequeue(out packet))
  90. {
  91. return;
  92. }
  93. if (cancellationToken.IsCancellationRequested)
  94. {
  95. return;
  96. }
  97. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, new[] { packet }, cancellationToken).ConfigureAwait(false);
  98. _logger.Verbose("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId);
  99. }
  100. catch (Exception exception)
  101. {
  102. if (exception is MqttCommunicationTimedOutException)
  103. {
  104. _logger.Warning(exception, "Sending publish packet failed: Timeout (ClientId: {0}).", _clientSession.ClientId);
  105. }
  106. else if (exception is MqttCommunicationException)
  107. {
  108. _logger.Warning(exception, "Sending publish packet failed: Communication exception (ClientId: {0}).", _clientSession.ClientId);
  109. }
  110. else if (exception is OperationCanceledException)
  111. {
  112. }
  113. else
  114. {
  115. _logger.Error(exception, "Sending publish packet failed (ClientId: {0}).", _clientSession.ClientId);
  116. }
  117. if (packet is MqttPublishPacket publishPacket)
  118. {
  119. if (publishPacket.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  120. {
  121. publishPacket.Dup = true;
  122. Enqueue(publishPacket);
  123. }
  124. }
  125. if (!cancellationToken.IsCancellationRequested)
  126. {
  127. _clientSession.Stop(MqttClientDisconnectType.NotClean);
  128. }
  129. }
  130. }
  131. public void Dispose()
  132. {
  133. _queueAutoResetEvent?.Dispose();
  134. }
  135. }
  136. }