You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

109 lines
3.8 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Core.Adapter;
  6. using MQTTnet.Core.Diagnostics;
  7. using MQTTnet.Core.Exceptions;
  8. using MQTTnet.Core.Packets;
  9. namespace MQTTnet.Core.Server
  10. {
  11. public sealed class MqttClientMessageQueue
  12. {
  13. private readonly BlockingCollection<MqttClientPublishPacketContext> _pendingPublishPackets = new BlockingCollection<MqttClientPublishPacketContext>();
  14. private readonly MqttServerOptions _options;
  15. private CancellationTokenSource _cancellationTokenSource;
  16. private IMqttCommunicationAdapter _adapter;
  17. public MqttClientMessageQueue(MqttServerOptions options)
  18. {
  19. _options = options ?? throw new ArgumentNullException(nameof(options));
  20. }
  21. public void Start(IMqttCommunicationAdapter adapter)
  22. {
  23. if (_cancellationTokenSource != null)
  24. {
  25. throw new InvalidOperationException($"{nameof(MqttClientMessageQueue)} already started.");
  26. }
  27. _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
  28. _cancellationTokenSource = new CancellationTokenSource();
  29. Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
  30. }
  31. public void Stop()
  32. {
  33. _adapter = null;
  34. _cancellationTokenSource?.Cancel();
  35. _cancellationTokenSource = null;
  36. _pendingPublishPackets?.Dispose();
  37. }
  38. public void Enqueue(MqttPublishPacket publishPacket)
  39. {
  40. if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
  41. _pendingPublishPackets.Add( new MqttClientPublishPacketContext( publishPacket ) );
  42. }
  43. private async Task SendPendingPublishPacketsAsync(CancellationToken cancellationToken)
  44. {
  45. foreach (var publishPacket in _pendingPublishPackets.GetConsumingEnumerable(cancellationToken))
  46. {
  47. try
  48. {
  49. if ( cancellationToken.IsCancellationRequested )
  50. {
  51. return;
  52. }
  53. if ( _adapter == null )
  54. {
  55. continue;
  56. }
  57. await TrySendPendingPublishPacketAsync( publishPacket ).ConfigureAwait( false );
  58. }
  59. catch ( Exception e )
  60. {
  61. MqttTrace.Error( nameof( MqttClientMessageQueue ), e, "Error while sending pending publish packets." );
  62. }
  63. }
  64. }
  65. private async Task TrySendPendingPublishPacketAsync(MqttClientPublishPacketContext publishPacketContext)
  66. {
  67. try
  68. {
  69. if (_adapter == null)
  70. {
  71. return;
  72. }
  73. publishPacketContext.PublishPacket.Dup = publishPacketContext.SendTries > 0;
  74. await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, publishPacketContext.PublishPacket).ConfigureAwait(false);
  75. publishPacketContext.IsSent = true;
  76. }
  77. catch (MqttCommunicationException exception)
  78. {
  79. MqttTrace.Warning(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
  80. _pendingPublishPackets.Add( publishPacketContext );
  81. }
  82. catch (Exception exception)
  83. {
  84. MqttTrace.Error(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
  85. _pendingPublishPackets.Add( publishPacketContext );
  86. }
  87. finally
  88. {
  89. publishPacketContext.SendTries++;
  90. }
  91. }
  92. }
  93. }