You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

99 lines
4.1 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Core.Adapter;
  6. using MQTTnet.Core.Exceptions;
  7. using MQTTnet.Core.Packets;
  8. using MQTTnet.Core.Protocol;
  9. using Microsoft.Extensions.Logging;
  10. namespace MQTTnet.Core.Server
  11. {
  12. public sealed class MqttClientPendingMessagesQueue
  13. {
  14. private readonly BlockingCollection<MqttPublishPacket> _pendingPublishPackets = new BlockingCollection<MqttPublishPacket>();
  15. private readonly MqttServerOptions _options;
  16. private readonly MqttClientSession _session;
  17. private readonly ILogger<MqttClientPendingMessagesQueue> _logger;
  18. public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session, ILogger<MqttClientPendingMessagesQueue> logger)
  19. {
  20. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  21. _session = session ?? throw new ArgumentNullException(nameof(session));
  22. _options = options ?? throw new ArgumentNullException(nameof(options));
  23. }
  24. public void Start(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
  25. {
  26. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  27. Task.Run(async () => await SendPendingPublishPacketsAsync(adapter, cancellationToken), cancellationToken).ConfigureAwait(false);
  28. }
  29. public void Enqueue(MqttPublishPacket publishPacket)
  30. {
  31. if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
  32. _pendingPublishPackets.Add(publishPacket);
  33. _logger.LogTrace("Enqueued packet (ClientId: {0}).", _session.ClientId);
  34. }
  35. private async Task SendPendingPublishPacketsAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
  36. {
  37. try
  38. {
  39. while (!cancellationToken.IsCancellationRequested)
  40. {
  41. await SendPendingPublishPacketAsync(adapter, cancellationToken);
  42. }
  43. }
  44. catch (OperationCanceledException)
  45. {
  46. }
  47. catch (Exception exception)
  48. {
  49. _logger.LogError(new EventId(), exception, "Unhandled exception while sending enqueued packet (ClientId: {0}).", _session.ClientId);
  50. }
  51. }
  52. private async Task SendPendingPublishPacketAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
  53. {
  54. MqttPublishPacket packet = null;
  55. try
  56. {
  57. packet = _pendingPublishPackets.Take(cancellationToken);
  58. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false);
  59. _logger.LogTrace("Enqueued packet sent (ClientId: {0}).", _session.ClientId);
  60. }
  61. catch (Exception exception)
  62. {
  63. if (exception is MqttCommunicationTimedOutException)
  64. {
  65. _logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to timeout (ClientId: {0}).", _session.ClientId);
  66. }
  67. else if (exception is MqttCommunicationException)
  68. {
  69. _logger.LogWarning(new EventId(), exception, "Sending publish packet failed due to communication exception (ClientId: {0}).", _session.ClientId);
  70. }
  71. else if (exception is OperationCanceledException)
  72. {
  73. }
  74. else
  75. {
  76. _logger.LogError(new EventId(), exception, "Sending publish packet failed (ClientId: {0}).", _session.ClientId);
  77. }
  78. if (packet != null && packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  79. {
  80. packet.Dup = true;
  81. _pendingPublishPackets.Add(packet, CancellationToken.None);
  82. }
  83. _session.Stop();
  84. }
  85. }
  86. }
  87. }