You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

MqttClientPendingMessagesQueue.cs 3.5 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Core.Adapter;
  6. using MQTTnet.Core.Diagnostics;
  7. using MQTTnet.Core.Exceptions;
  8. using MQTTnet.Core.Packets;
  9. using MQTTnet.Core.Protocol;
  10. namespace MQTTnet.Core.Server
  11. {
  12. public sealed class MqttClientPendingMessagesQueue
  13. {
  14. private readonly BlockingCollection<MqttPublishPacket> _pendingPublishPackets = new BlockingCollection<MqttPublishPacket>();
  15. private readonly MqttClientSession _session;
  16. private readonly MqttServerOptions _options;
  17. public MqttClientPendingMessagesQueue(MqttServerOptions options, MqttClientSession session)
  18. {
  19. _session = session ?? throw new ArgumentNullException(nameof(session));
  20. _options = options ?? throw new ArgumentNullException(nameof(options));
  21. }
  22. public void Start(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
  23. {
  24. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  25. Task.Run(() => SendPendingPublishPacketsAsync(adapter, cancellationToken), cancellationToken);
  26. }
  27. public void Enqueue(MqttPublishPacket publishPacket)
  28. {
  29. if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
  30. _pendingPublishPackets.Add(publishPacket);
  31. }
  32. private async Task SendPendingPublishPacketsAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
  33. {
  34. try
  35. {
  36. while (!cancellationToken.IsCancellationRequested)
  37. {
  38. await SendPendingPublishPacketAsync(adapter, cancellationToken);
  39. }
  40. }
  41. catch (OperationCanceledException)
  42. {
  43. }
  44. catch (Exception exception)
  45. {
  46. MqttNetTrace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Unhandled exception while sending pending publish packets.");
  47. }
  48. }
  49. private async Task SendPendingPublishPacketAsync(IMqttCommunicationAdapter adapter, CancellationToken cancellationToken)
  50. {
  51. var packet = _pendingPublishPackets.Take(cancellationToken);
  52. try
  53. {
  54. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false);
  55. }
  56. catch (Exception exception)
  57. {
  58. if (exception is MqttCommunicationTimedOutException)
  59. {
  60. MqttNetTrace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to timeout.");
  61. }
  62. else if (exception is MqttCommunicationException)
  63. {
  64. MqttNetTrace.Warning(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed due to communication exception.");
  65. }
  66. if (exception is OperationCanceledException)
  67. {
  68. }
  69. else
  70. {
  71. MqttNetTrace.Error(nameof(MqttClientPendingMessagesQueue), exception, "Sending publish packet failed.");
  72. }
  73. if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
  74. {
  75. packet.Dup = true;
  76. _pendingPublishPackets.Add(packet, cancellationToken);
  77. }
  78. _session.Stop();
  79. }
  80. }
  81. }
  82. }