You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

106 lines
3.7 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Core.Channel;
  7. using MQTTnet.Core.Client;
  8. using MQTTnet.Core.Diagnostics;
  9. using MQTTnet.Core.Exceptions;
  10. using MQTTnet.Core.Internal;
  11. using MQTTnet.Core.Packets;
  12. using MQTTnet.Core.Serializer;
  13. namespace MQTTnet.Core.Adapter
  14. {
  15. public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter
  16. {
  17. private readonly IMqttCommunicationChannel _channel;
  18. private readonly byte[] _readBuffer = new byte[BufferConstants.Size];
  19. public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer)
  20. {
  21. _channel = channel ?? throw new ArgumentNullException(nameof(channel));
  22. PacketSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
  23. }
  24. public IMqttPacketSerializer PacketSerializer { get; }
  25. public Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
  26. {
  27. return _channel.ConnectAsync(options).TimeoutAfter(timeout);
  28. }
  29. public Task DisconnectAsync()
  30. {
  31. return _channel.DisconnectAsync();
  32. }
  33. public async Task SendPacketsAsync( TimeSpan timeout, IEnumerable<MqttBasePacket> packets )
  34. {
  35. lock ( _channel )
  36. {
  37. foreach (var packet in packets )
  38. {
  39. MqttTrace.Information( nameof( MqttChannelCommunicationAdapter ), "TX >>> {0} [Timeout={1}]", packet, timeout );
  40. var writeBuffer = PacketSerializer.Serialize(packet);
  41. _sendTask = _sendTask.ContinueWith( p => _channel.SendStream.WriteAsync( writeBuffer, 0, writeBuffer.Length ) );
  42. }
  43. }
  44. await _sendTask; // configure await false geneates stackoverflow
  45. await _channel.SendStream.FlushAsync().TimeoutAfter( timeout ).ConfigureAwait( false );
  46. }
  47. private Task _sendTask = Task.FromResult(0); // this task is used to prevent overlapping write
  48. public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout)
  49. {
  50. Tuple<MqttPacketHeader, MemoryStream> tuple;
  51. if (timeout > TimeSpan.Zero)
  52. {
  53. tuple = await ReceiveAsync(_channel.RawStream).TimeoutAfter(timeout).ConfigureAwait(false);
  54. }
  55. else
  56. {
  57. tuple = await ReceiveAsync(_channel.ReceiveStream).ConfigureAwait(false);
  58. }
  59. var packet = PacketSerializer.Deserialize(tuple.Item1, tuple.Item2);
  60. if (packet == null)
  61. {
  62. throw new MqttProtocolViolationException("Received malformed packet.");
  63. }
  64. MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "RX <<< {0}", packet);
  65. return packet;
  66. }
  67. private async Task<Tuple<MqttPacketHeader, MemoryStream>> ReceiveAsync(Stream stream)
  68. {
  69. var header = MqttPacketReader.ReadHeaderFromSource(stream);
  70. MemoryStream body = null;
  71. if (header.BodyLength > 0)
  72. {
  73. var totalRead = 0;
  74. do
  75. {
  76. var read = await stream.ReadAsync(_readBuffer, totalRead, header.BodyLength - totalRead)
  77. .ConfigureAwait( false );
  78. totalRead += read;
  79. } while (totalRead < header.BodyLength);
  80. body = new MemoryStream(_readBuffer, 0, header.BodyLength);
  81. }
  82. else
  83. {
  84. body = new MemoryStream();
  85. }
  86. return Tuple.Create(header, body);
  87. }
  88. }
  89. }