選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

TestMqttCommunicationAdapter.cs 2.8 KiB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using MQTTnet.Adapter;
  6. using MQTTnet.Packets;
  7. using MQTTnet.Serializer;
  8. namespace MQTTnet.Core.Tests
  9. {
  10. public class TestMqttCommunicationAdapter : IMqttChannelAdapter
  11. {
  12. private readonly BlockingCollection<MqttBasePacket> _incomingPackets = new BlockingCollection<MqttBasePacket>();
  13. public TestMqttCommunicationAdapter Partner { get; set; }
  14. public string Endpoint { get; }
  15. public IMqttPacketSerializer PacketSerializer { get; } = new MqttPacketSerializer();
  16. public event EventHandler ReadingPacketStarted;
  17. public event EventHandler ReadingPacketCompleted;
  18. public void Dispose()
  19. {
  20. }
  21. public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  22. {
  23. return Task.FromResult(0);
  24. }
  25. public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
  26. {
  27. return Task.FromResult(0);
  28. }
  29. public Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
  30. {
  31. ThrowIfPartnerIsNull();
  32. Partner.EnqueuePacketInternal(packet);
  33. return Task.FromResult(0);
  34. }
  35. public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken)
  36. {
  37. ThrowIfPartnerIsNull();
  38. if (timeout > TimeSpan.Zero)
  39. {
  40. using (var timeoutCts = new CancellationTokenSource(timeout))
  41. using (var cts = CancellationTokenSource.CreateLinkedTokenSource(timeoutCts.Token, cancellationToken))
  42. {
  43. return await Task.Run(() =>
  44. {
  45. try
  46. {
  47. return _incomingPackets.Take(cts.Token);
  48. }
  49. catch
  50. {
  51. return null;
  52. }
  53. }, cts.Token);
  54. }
  55. }
  56. return await Task.Run(() =>
  57. {
  58. try
  59. {
  60. return _incomingPackets.Take(cancellationToken);
  61. }
  62. catch
  63. {
  64. return null;
  65. }
  66. }, cancellationToken);
  67. }
  68. private void EnqueuePacketInternal(MqttBasePacket packet)
  69. {
  70. if (packet == null) throw new ArgumentNullException(nameof(packet));
  71. _incomingPackets.Add(packet);
  72. }
  73. private void ThrowIfPartnerIsNull()
  74. {
  75. if (Partner == null)
  76. {
  77. throw new InvalidOperationException("Partner is not set.");
  78. }
  79. }
  80. }
  81. }