You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

84 lines
3.1 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Core.Adapter;
  6. using MQTTnet.Core.Diagnostics;
  7. using MQTTnet.Core.Exceptions;
  8. using MQTTnet.Core.Packets;
  9. using System.Linq;
  10. namespace MQTTnet.Core.Server
  11. {
  12. public sealed class MqttClientMessageQueue
  13. {
  14. private readonly BlockingCollection<MqttPublishPacket> _pendingPublishPackets = new BlockingCollection<MqttPublishPacket>();
  15. private readonly MqttServerOptions _options;
  16. private CancellationTokenSource _cancellationTokenSource;
  17. public MqttClientMessageQueue(MqttServerOptions options)
  18. {
  19. _options = options ?? throw new ArgumentNullException(nameof(options));
  20. }
  21. public void Start(IMqttCommunicationAdapter adapter)
  22. {
  23. if (_cancellationTokenSource != null)
  24. {
  25. throw new InvalidOperationException($"{nameof(MqttClientMessageQueue)} already started.");
  26. }
  27. if (adapter == null) throw new ArgumentNullException(nameof(adapter));
  28. _cancellationTokenSource = new CancellationTokenSource();
  29. Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token, adapter), _cancellationTokenSource.Token);
  30. }
  31. public void Stop()
  32. {
  33. _cancellationTokenSource?.Cancel();
  34. _cancellationTokenSource = null;
  35. _pendingPublishPackets?.Dispose();
  36. }
  37. public void Enqueue(MqttPublishPacket publishPacket)
  38. {
  39. if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
  40. _pendingPublishPackets.Add(publishPacket);
  41. }
  42. private async Task SendPendingPublishPacketsAsync(CancellationToken cancellationToken, IMqttCommunicationAdapter adapter)
  43. {
  44. var consumable = _pendingPublishPackets.GetConsumingEnumerable();
  45. while (!cancellationToken.IsCancellationRequested)
  46. {
  47. var packets = consumable.Take(_pendingPublishPackets.Count).ToList();
  48. try
  49. {
  50. await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, packets).ConfigureAwait(false);
  51. }
  52. catch (MqttCommunicationException exception)
  53. {
  54. MqttTrace.Warning(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
  55. foreach (var publishPacket in packets)
  56. {
  57. publishPacket.Dup = true;
  58. _pendingPublishPackets.Add(publishPacket);
  59. }
  60. }
  61. catch (Exception exception)
  62. {
  63. MqttTrace.Error(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
  64. foreach (var publishPacket in packets)
  65. {
  66. publishPacket.Dup = true;
  67. _pendingPublishPackets.Add(publishPacket);
  68. }
  69. }
  70. }
  71. }
  72. }
  73. }