You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

123 lines
4.8 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Adapter;
  6. using MQTTnet.Diagnostics;
  7. using MQTTnet.Exceptions;
  8. using MQTTnet.Packets;
  9. using MQTTnet.Protocol;
  10. namespace MQTTnet.Server
  11. {
  12. public sealed class MqttClientPendingMessagesQueue : IDisposable
  13. {
  14. private readonly ConcurrentQueue<MqttBasePacket> _queue = new ConcurrentQueue<MqttBasePacket>();
  15. private readonly SemaphoreSlim _queueWaitSemaphore = new SemaphoreSlim(0);
  16. private readonly IMqttServerOptions _options;
  17. private readonly MqttClientSession _clientSession;
  18. private readonly IMqttNetLogger _logger;
  19. public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession clientSession, IMqttNetLogger logger)
  20. {
  21. _options = options ?? throw new ArgumentNullException(nameof(options));
  22. _clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession));
  23. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  24. }
  25. public int Count => _queue.Count;
  26. public void Start(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
  27. {
  28. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  29. if (cancellationToken.IsCancellationRequested)
  30. {
  31. return;
  32. }
  33. Task.Run(async () => await SendQueuedPacketsAsync(adapter, cancellationToken), cancellationToken).ConfigureAwait(false);
  34. }
  35. public void Enqueue(MqttBasePacket packet)
  36. {
  37. if (packet == null) throw new ArgumentNullException(nameof(packet));
  38. _queue.Enqueue(packet);
  39. _queueWaitSemaphore.Release();
  40. _logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _clientSession.ClientId);
  41. }
  42. private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
  43. {
  44. try
  45. {
  46. while (!cancellationToken.IsCancellationRequested)
  47. {
  48. await SendQueuedPacketAsync(adapter, cancellationToken);
  49. }
  50. }
  51. catch (OperationCanceledException)
  52. {
  53. }
  54. catch (Exception exception)
  55. {
  56. _logger.Error<MqttClientPendingMessagesQueue>(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _clientSession.ClientId);
  57. }
  58. }
  59. private async Task SendQueuedPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
  60. {
  61. MqttBasePacket packet = null;
  62. try
  63. {
  64. await _queueWaitSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
  65. if (!_queue.TryDequeue(out packet))
  66. {
  67. throw new InvalidOperationException(); // should not happen
  68. }
  69. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { packet }).ConfigureAwait(false);
  70. _logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId);
  71. }
  72. catch (Exception exception)
  73. {
  74. if (exception is MqttCommunicationTimedOutException)
  75. {
  76. _logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _clientSession.ClientId);
  77. }
  78. else if (exception is MqttCommunicationException)
  79. {
  80. _logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _clientSession.ClientId);
  81. }
  82. else if (exception is OperationCanceledException)
  83. {
  84. }
  85. else
  86. {
  87. _logger.Error<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed (ClientId: {0}).", _clientSession.ClientId);
  88. }
  89. if (packet is MqttPublishPacket publishPacket)
  90. {
  91. if (publishPacket.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  92. {
  93. publishPacket.Dup = true;
  94. _queue.Enqueue(packet);
  95. _queueWaitSemaphore.Release();
  96. }
  97. }
  98. await _clientSession.StopAsync();
  99. }
  100. }
  101. public void Dispose()
  102. {
  103. _queueWaitSemaphore?.Dispose();
  104. }
  105. }
  106. }