You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

141 regels
5.2 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Adapter;
  6. using MQTTnet.Diagnostics;
  7. using MQTTnet.Exceptions;
  8. using MQTTnet.Packets;
  9. using MQTTnet.Protocol;
  10. namespace MQTTnet.Server
  11. {
  12. public sealed class MqttClientPendingMessagesQueue : IDisposable
  13. {
  14. private readonly ConcurrentQueue<MqttBasePacket> _queue = new ConcurrentQueue<MqttBasePacket>();
  15. private readonly SemaphoreSlim _queueWaitSemaphore = new SemaphoreSlim(0);
  16. private readonly IMqttServerOptions _options;
  17. private readonly MqttClientSession _clientSession;
  18. private readonly IMqttNetLogger _logger;
  19. private Task _workerTask;
  20. public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession clientSession, IMqttNetLogger logger)
  21. {
  22. _options = options ?? throw new ArgumentNullException(nameof(options));
  23. _clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession));
  24. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  25. }
  26. public int Count => _queue.Count;
  27. public void Start(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
  28. {
  29. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  30. if (cancellationToken.IsCancellationRequested)
  31. {
  32. return;
  33. }
  34. _workerTask = Task.Run(() => SendQueuedPacketsAsync(adapter, cancellationToken), cancellationToken);
  35. }
  36. public void WaitForCompletion()
  37. {
  38. if (_workerTask != null)
  39. {
  40. Task.WaitAll(_workerTask);
  41. }
  42. }
  43. public void Enqueue(MqttBasePacket packet)
  44. {
  45. if (packet == null) throw new ArgumentNullException(nameof(packet));
  46. _queue.Enqueue(packet);
  47. _queueWaitSemaphore.Release();
  48. _logger.Verbose<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _clientSession.ClientId);
  49. }
  50. private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
  51. {
  52. try
  53. {
  54. while (!cancellationToken.IsCancellationRequested)
  55. {
  56. await SendNextQueuedPacketAsync(adapter, cancellationToken).ConfigureAwait(false);
  57. }
  58. }
  59. catch (OperationCanceledException)
  60. {
  61. }
  62. catch (Exception exception)
  63. {
  64. _logger.Error<MqttClientPendingMessagesQueue>(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _clientSession.ClientId);
  65. }
  66. }
  67. private async Task SendNextQueuedPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
  68. {
  69. MqttBasePacket packet = null;
  70. try
  71. {
  72. await _queueWaitSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
  73. if (!_queue.TryDequeue(out packet))
  74. {
  75. throw new InvalidOperationException(); // should not happen
  76. }
  77. if (cancellationToken.IsCancellationRequested)
  78. {
  79. return;
  80. }
  81. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { packet }).ConfigureAwait(false);
  82. _logger.Verbose<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId);
  83. }
  84. catch (Exception exception)
  85. {
  86. if (exception is MqttCommunicationTimedOutException)
  87. {
  88. _logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _clientSession.ClientId);
  89. }
  90. else if (exception is MqttCommunicationException)
  91. {
  92. _logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _clientSession.ClientId);
  93. }
  94. else if (exception is OperationCanceledException)
  95. {
  96. }
  97. else
  98. {
  99. _logger.Error<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed (ClientId: {0}).", _clientSession.ClientId);
  100. }
  101. if (packet is MqttPublishPacket publishPacket)
  102. {
  103. if (publishPacket.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  104. {
  105. publishPacket.Dup = true;
  106. _queue.Enqueue(packet);
  107. _queueWaitSemaphore.Release();
  108. }
  109. }
  110. if (!cancellationToken.IsCancellationRequested)
  111. {
  112. await _clientSession.StopAsync().ConfigureAwait(false);
  113. }
  114. }
  115. }
  116. public void Dispose()
  117. {
  118. _queueWaitSemaphore?.Dispose();
  119. }
  120. }
  121. }