You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

121 lines
4.6 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Adapter;
  6. using MQTTnet.Diagnostics;
  7. using MQTTnet.Exceptions;
  8. using MQTTnet.Packets;
  9. using MQTTnet.Protocol;
  10. namespace MQTTnet.Server
  11. {
  12. public sealed class MqttClientPendingMessagesQueue : IDisposable
  13. {
  14. private readonly ConcurrentQueue<MqttBasePacket> _queue = new ConcurrentQueue<MqttBasePacket>();
  15. private readonly SemaphoreSlim _queueWaitSemaphore = new SemaphoreSlim(0);
  16. private readonly IMqttServerOptions _options;
  17. private readonly MqttClientSession _session;
  18. private readonly IMqttNetLogger _logger;
  19. public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession session, IMqttNetLogger logger)
  20. {
  21. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  22. _session = session ?? throw new ArgumentNullException(nameof(session));
  23. _options = options ?? throw new ArgumentNullException(nameof(options));
  24. }
  25. public void Start(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
  26. {
  27. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  28. if (cancellationToken.IsCancellationRequested)
  29. {
  30. return;
  31. }
  32. Task.Run(async () => await SendQueuedPacketsAsync(adapter, cancellationToken), cancellationToken).ConfigureAwait(false);
  33. }
  34. public void Enqueue(MqttBasePacket packet)
  35. {
  36. if (packet == null) throw new ArgumentNullException(nameof(packet));
  37. _queue.Enqueue(packet);
  38. _queueWaitSemaphore.Release();
  39. _logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _session.ClientId);
  40. }
  41. private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
  42. {
  43. try
  44. {
  45. while (!cancellationToken.IsCancellationRequested)
  46. {
  47. await SendQueuedPacketAsync(adapter, cancellationToken);
  48. }
  49. }
  50. catch (OperationCanceledException)
  51. {
  52. }
  53. catch (Exception exception)
  54. {
  55. _logger.Error<MqttClientPendingMessagesQueue>(exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _session.ClientId);
  56. }
  57. }
  58. private async Task SendQueuedPacketAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken)
  59. {
  60. MqttBasePacket packet = null;
  61. try
  62. {
  63. await _queueWaitSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
  64. if (!_queue.TryDequeue(out packet))
  65. {
  66. throw new InvalidOperationException(); // should not happen
  67. }
  68. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false);
  69. _logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _session.ClientId);
  70. }
  71. catch (Exception exception)
  72. {
  73. if (exception is MqttCommunicationTimedOutException)
  74. {
  75. _logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _session.ClientId);
  76. }
  77. else if (exception is MqttCommunicationException)
  78. {
  79. _logger.Warning<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _session.ClientId);
  80. }
  81. else if (exception is OperationCanceledException)
  82. {
  83. }
  84. else
  85. {
  86. _logger.Error<MqttClientPendingMessagesQueue>(exception, "Sending publish packet failed (ClientId: {0}).", _session.ClientId);
  87. }
  88. if (packet is MqttPublishPacket publishPacket)
  89. {
  90. if (publishPacket.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  91. {
  92. publishPacket.Dup = true;
  93. _queue.Enqueue(packet);
  94. _queueWaitSemaphore.Release();
  95. }
  96. }
  97. await _session.StopAsync();
  98. }
  99. }
  100. public void Dispose()
  101. {
  102. _queueWaitSemaphore?.Dispose();
  103. }
  104. }
  105. }