You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

103 lines
3.0 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Adapter;
  7. using MQTTnet.Packets;
  8. using MQTTnet.Serializer;
  9. namespace MQTTnet.Core.Tests
  10. {
  11. public class TestMqttCommunicationAdapter : IMqttChannelAdapter
  12. {
  13. private readonly BlockingCollection<MqttBasePacket> _incomingPackets = new BlockingCollection<MqttBasePacket>();
  14. public TestMqttCommunicationAdapter Partner { get; set; }
  15. public string Endpoint { get; }
  16. public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer();
  17. public event EventHandler ReadingPacketStarted;
  18. public event EventHandler ReadingPacketCompleted;
  19. public void Dispose()
  20. {
  21. }
  22. public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  23. {
  24. return Task.FromResult(0);
  25. }
  26. public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  27. {
  28. return Task.FromResult(0);
  29. }
  30. public Task SendPacketsAsync(TimeSpan timeout, IEnumerable<MqttBasePacket> packets, CancellationToken cancellationToken)
  31. {
  32. ThrowIfPartnerIsNull();
  33. foreach (var packet in packets)
  34. {
  35. Partner.EnqueuePacketInternal(packet);
  36. }
  37. return Task.FromResult(0);
  38. }
  39. public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken)
  40. {
  41. ThrowIfPartnerIsNull();
  42. if (timeout > TimeSpan.Zero)
  43. {
  44. using (var timeoutCts = new CancellationTokenSource(timeout))
  45. using (var cts = CancellationTokenSource.CreateLinkedTokenSource(timeoutCts.Token, cancellationToken))
  46. {
  47. return await Task.Run(() =>
  48. {
  49. try
  50. {
  51. return _incomingPackets.Take(cts.Token);
  52. }
  53. catch
  54. {
  55. return null;
  56. }
  57. }, cts.Token);
  58. }
  59. }
  60. return await Task.Run(() =>
  61. {
  62. try
  63. {
  64. return _incomingPackets.Take(cancellationToken);
  65. }
  66. catch
  67. {
  68. return null;
  69. }
  70. }, cancellationToken);
  71. }
  72. private void EnqueuePacketInternal(MqttBasePacket packet)
  73. {
  74. if (packet == null) throw new ArgumentNullException(nameof(packet));
  75. _incomingPackets.Add(packet);
  76. }
  77. private void ThrowIfPartnerIsNull()
  78. {
  79. if (Partner == null)
  80. {
  81. throw new InvalidOperationException("Partner is not set.");
  82. }
  83. }
  84. }
  85. }