You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

100 lines
2.9 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Adapter;
  7. using MQTTnet.Packets;
  8. using MQTTnet.Serializer;
  9. namespace MQTTnet.Core.Tests
  10. {
  11. public class TestMqttCommunicationAdapter : IMqttChannelAdapter
  12. {
  13. private readonly BlockingCollection<MqttBasePacket> _incomingPackets = new BlockingCollection<MqttBasePacket>();
  14. public TestMqttCommunicationAdapter Partner { get; set; }
  15. public string Endpoint { get; }
  16. public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer();
  17. public void Dispose()
  18. {
  19. }
  20. public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  21. {
  22. return Task.FromResult(0);
  23. }
  24. public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  25. {
  26. return Task.FromResult(0);
  27. }
  28. public Task SendPacketsAsync(TimeSpan timeout, IEnumerable<MqttBasePacket> packets, CancellationToken cancellationToken)
  29. {
  30. ThrowIfPartnerIsNull();
  31. foreach (var packet in packets)
  32. {
  33. Partner.EnqueuePacketInternal(packet);
  34. }
  35. return Task.FromResult(0);
  36. }
  37. public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken)
  38. {
  39. ThrowIfPartnerIsNull();
  40. if (timeout > TimeSpan.Zero)
  41. {
  42. using (var timeoutCts = new CancellationTokenSource(timeout))
  43. using (var cts = CancellationTokenSource.CreateLinkedTokenSource(timeoutCts.Token, cancellationToken))
  44. {
  45. return await Task.Run(() =>
  46. {
  47. try
  48. {
  49. return _incomingPackets.Take(cts.Token);
  50. }
  51. catch
  52. {
  53. return null;
  54. }
  55. }, cts.Token);
  56. }
  57. }
  58. return await Task.Run(() =>
  59. {
  60. try
  61. {
  62. return _incomingPackets.Take(cancellationToken);
  63. }
  64. catch
  65. {
  66. return null;
  67. }
  68. }, cancellationToken);
  69. }
  70. private void EnqueuePacketInternal(MqttBasePacket packet)
  71. {
  72. if (packet == null) throw new ArgumentNullException(nameof(packet));
  73. _incomingPackets.Add(packet);
  74. }
  75. private void ThrowIfPartnerIsNull()
  76. {
  77. if (Partner == null)
  78. {
  79. throw new InvalidOperationException("Partner is not set.");
  80. }
  81. }
  82. }
  83. }